diff --git a/examples/server.conf b/examples/server.conf index 67fbb05..5ea0f41 100644 --- a/examples/server.conf +++ b/examples/server.conf @@ -7,3 +7,7 @@ ca_path=/etc/pykmip/certs/server_ca_cert.pem auth_suite=Basic policy_path=/etc/pykmip/policies enable_tls_client_auth=True +tls_cipher_suites= + EXAMPLE_CIPHER_SUITE_1 + EXAMPLE_CIPHER_SUITE_2 + EXAMPLE_CIPHER_SUITE_3 diff --git a/kmip/services/auth.py b/kmip/services/auth.py index e783f7b..a53af29 100644 --- a/kmip/services/auth.py +++ b/kmip/services/auth.py @@ -26,13 +26,108 @@ class AuthenticationSuite(object): Acts as the base of the suite hierarchy. """ + # OpenSSL cipher suites + # Explicitly listed suites for Basic and TLSv1.2 authentication for KMIP + # profiles. + # + # Obtained from: + # https://www.openssl.org/docs/man1.1.0/apps/ciphers.html + # https://www.openssl.org/docs/man1.0.2/apps/ciphers.html + openssl_cipher_suite_map = { + # TLS v1.2 cipher suites + 'TLS_RSA_WITH_AES_256_CBC_SHA256': 'AES256-SHA256', + 'TLS_RSA_WITH_AES_128_CBC_SHA256': 'AES128-SHA256', + 'TLS_DH_DSS_WITH_AES_128_CBC_SHA256': 'DH-DSS-AES128-SHA256', + 'TLS_DH_RSA_WITH_AES_128_CBC_SHA256': 'DH-RSA-AES128-SHA256', + 'TLS_DHE_DSS_WITH_AES_128_CBC_SHA256': 'DHE-DSS-AES128-SHA256', + 'TLS_DHE_RSA_WITH_AES_128_CBC_SHA256': 'DHE-RSA-AES128-SHA256', + 'TLS_DH_DSS_WITH_AES_256_CBC_SHA256': 'DH-DSS-AES256-SHA256', + 'TLS_DH_RSA_WITH_AES_256_CBC_SHA256': 'DH-RSA-AES256-SHA256', + 'TLS_DHE_DSS_WITH_AES_256_CBC_SHA256': 'DHE-DSS-AES256-SHA256', + 'TLS_DHE_RSA_WITH_AES_256_CBC_SHA256': 'DHE-RSA-AES256-SHA256', + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256': 'ECDHE-ECDSA-AES128-SHA256', + 'TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384': 'ECDHE-ECDSA-AES256-SHA384', + 'TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256': 'ECDHE-RSA-AES128-SHA256', + 'TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384': 'ECDHE-RSA-AES256-SHA384', + 'TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256': + 'ECDHE-ECDSA-AES128-GCM-SHA256', + 'TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384': + 'ECDHE-ECDSA-AES256-GCM-SHA384', + 'TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256': 'ECDH-ECDSA-AES128-SHA256', + 'TLS_ECDH_ECDSA_WITH_AES_256_CBC_SHA384': 'ECDH-ECDSA-AES256-SHA384', + 'TLS_ECDH_RSA_WITH_AES_128_CBC_SHA256': 'ECDH-RSA-AES128-SHA256', + 'TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384': 'ECDH-RSA-AES256-SHA384', + + # AES ciphersuites from RFC3268, extending TLS v1.0 + 'TLS_RSA_WITH_AES_128_CBC_SHA': 'AES128-SHA', + 'TLS_RSA_WITH_AES_256_CBC_SHA': 'AES256-SHA', + 'TLS_DH_DSS_WITH_AES_128_CBC_SHA': 'DH-DSS-AES128-SHA', + 'TLS_DH_RSA_WITH_AES_128_CBC_SHA': 'DH-RSA-AES128-SHA', + 'TLS_DHE_DSS_WITH_AES_128_CBC_SHA': 'DHE-DSS-AES128-SHA', + 'TLS_DHE_RSA_WITH_AES_128_CBC_SHA': 'DHE-RSA-AES128-SHA', + 'TLS_DH_DSS_WITH_AES_256_CBC_SHA': 'DH-DSS-AES256-SHA', + 'TLS_DH_RSA_WITH_AES_256_CBC_SHA': 'DH-RSA-AES256-SHA', + 'TLS_DHE_DSS_WITH_AES_256_CBC_SHA': 'DHE-DSS-AES256-SHA', + 'TLS_DHE_RSA_WITH_AES_256_CBC_SHA': 'DHE-RSA-AES256-SHA', + + # Elliptic curve cipher suites. + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA': 'ECDHE-ECDSA-AES128-SHA', + 'TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA': 'ECDHE-RSA-AES128-SHA', + + # Pre shared keying (PSK) cipheruites + 'TLS_PSK_WITH_AES_128_CBC_SHA': 'PSK-AES128-CBC-SHA', + 'TLS_PSK_WITH_AES_256_CBC_SHA': 'PSK-AES256-CBC-SHA', + + # No OpenSSL support + 'TLS_DHE_PSK_WITH_AES_128_CBC_SHA': None, + 'TLS_DHE_PSK_WITH_AES_256_CBC_SHA': None, + 'TLS_RSA_PSK_WITH_AES_128_CBC_SHA': None, + 'TLS_RSA_PSK_WITH_AES_256_CBC_SHA': None + } + + _default_cipher_suites = [] + @abc.abstractmethod - def __init__(self): + def __init__(self, cipher_suites=None): """ Create an AuthenticationSuite object. + + Args: + cipher_suites (list): A list of strings representing the names of + cipher suites to use. Overrides the default set of cipher + suites. Optional, defaults to None. """ - self._profile = [] - self._ciphers = '' + self._custom_suites = [] + + # Compose a unique list of custom cipher suites if any were provided. + # Translate each suite name into its corresponding OpenSSL suite name, + # allowing for both specification and OpenSSL suite names in the + # provided list. + if cipher_suites: + for cipher_suite in cipher_suites: + if cipher_suite in self.openssl_cipher_suite_map.keys(): + suite = self.openssl_cipher_suite_map.get(cipher_suite) + if suite: + self._custom_suites.append(suite) + elif cipher_suite in self.openssl_cipher_suite_map.values(): + if cipher_suite: + self._custom_suites.append(cipher_suite) + self._custom_suites = list(set(self._custom_suites)) + + # Filter the custom suites to only include those from the default + # cipher suite list (provided for each subclass authentication suite). + # If no custom suites were specified, use the default cipher suites. + suites = [] + if self._custom_suites: + for suite in self._custom_suites: + if suite in self._default_cipher_suites: + suites.append(suite) + else: + suites = self._default_cipher_suites + + self._cipher_suites = ':'.join(suites) + if self._cipher_suites == '': + self._cipher_suites = ':'.join(self._default_cipher_suites) @property def protocol(self): @@ -53,7 +148,7 @@ class AuthenticationSuite(object): string: A colon delimited string listing the valid ciphers for the suite protocol. """ - return self._ciphers + return self._cipher_suites class BasicAuthenticationSuite(AuthenticationSuite): @@ -64,26 +159,32 @@ class BasicAuthenticationSuite(AuthenticationSuite): in NIST 800-57, as defined by the KMIP specification. """ - def __init__(self): + _default_cipher_suites = [ + 'AES128-SHA', + 'DES-CBC3-SHA', + 'AES256-SHA', + 'DHE-DSS-DES-CBC3-SHA', + 'DHE-RSA-DES-CBC3-SHA', + 'DH-DSS-AES128-SHA', + 'DH-RSA-AES128-SHA', + 'DHE-DSS-AES128-SHA', + 'DHE-RSA-AES128-SHA', + 'DH-RSA-AES256-SHA', + 'DHE-DSS-AES256-SHA', + 'DHE-RSA-AES256-SHA' + ] + + def __init__(self, cipher_suites=None): """ Create a BasicAuthenticationSuite object. + + Args: + cipher_suites (list): A list of strings representing the names of + cipher suites to use. Overrides the default set of cipher + suites. Optional, defaults to None. """ - super(BasicAuthenticationSuite, self).__init__() + super(BasicAuthenticationSuite, self).__init__(cipher_suites) self._protocol = ssl.PROTOCOL_TLSv1 - self._ciphers = ':'.join(( - 'AES128-SHA', - 'DES-CBC3-SHA', - 'AES256-SHA', - 'DHE-DSS-DES-CBC3-SHA', - 'DHE-RSA-DES-CBC3-SHA', - 'DH-DSS-AES128-SHA', - 'DH-RSA-AES128-SHA', - 'DHE-DSS-AES128-SHA', - 'DHE-RSA-AES128-SHA', - 'DH-RSA-AES256-SHA', - 'DHE-DSS-AES256-SHA', - 'DHE-RSA-AES256-SHA', - )) class TLS12AuthenticationSuite(AuthenticationSuite): @@ -94,34 +195,40 @@ class TLS12AuthenticationSuite(AuthenticationSuite): in NIST 800-57, as defined by the KMIP specification. """ - def __init__(self): + _default_cipher_suites = [ + 'AES128-SHA256', + 'AES256-SHA256', + 'DH-DSS-AES256-SHA256', + 'DH-DSS-AES128-SHA256', + 'DH-RSA-AES128-SHA256', + 'DHE-DSS-AES128-SHA256', + 'DHE-RSA-AES128-SHA256', + 'DH-DSS-AES256-SHA256', + 'DH-RSA-AES256-SHA256', + 'DHE-DSS-AES256-SHA256', + 'DHE-RSA-AES256-SHA256', + 'ECDH-ECDSA-AES128-SHA256', + 'ECDH-ECDSA-AES256-SHA256', + 'ECDHE-ECDSA-AES128-SHA256', + 'ECDHE-ECDSA-AES256-SHA384', + 'ECDH-RSA-AES128-SHA256', + 'ECDH-RSA-AES256-SHA384', + 'ECDHE-RSA-AES128-SHA256', + 'ECDHE-RSA-AES256-SHA384', + 'ECDHE-ECDSA-AES128-GCM-SHA256', + 'ECDHE-ECDSA-AES256-GCM-SHA384', + 'ECDHE-ECDSA-AES128-SHA256', + 'ECDHE-ECDSA-AES256-SHA384' + ] + + def __init__(self, cipher_suites=None): """ Create a TLS12AuthenticationSuite object. + + Args: + cipher_suites (list): A list of strings representing the names of + cipher suites to use. Overrides the default set of cipher + suites. Optional, defaults to None. """ - super(TLS12AuthenticationSuite, self).__init__() + super(TLS12AuthenticationSuite, self).__init__(cipher_suites) self._protocol = ssl.PROTOCOL_TLSv1_2 - self._ciphers = ':'.join(( - 'AES128-SHA256', - 'AES256-SHA256', - 'DH-DSS-AES256-SHA256', - 'DH-DSS-AES128-SHA256', - 'DH-RSA-AES128-SHA256', - 'DHE-DSS-AES128-SHA256', - 'DHE-RSA-AES128-SHA256', - 'DH-DSS-AES256-SHA256', - 'DH-RSA-AES256-SHA256', - 'DHE-DSS-AES256-SHA256', - 'DHE-RSA-AES256-SHA256', - 'ECDH-ECDSA-AES128-SHA256', - 'ECDH-ECDSA-AES256-SHA256', - 'ECDHE-ECDSA-AES128-SHA256', - 'ECDHE-ECDSA-AES256-SHA384', - 'ECDH-RSA-AES128-SHA256', - 'ECDH-RSA-AES256-SHA384', - 'ECDHE-RSA-AES128-SHA256', - 'ECDHE-RSA-AES256-SHA384', - 'ECDHE-ECDSA-AES128-GCM-SHA256', - 'ECDHE-ECDSA-AES256-GCM-SHA384', - 'ECDHE-ECDSA-AES128-SHA256', - 'ECDHE-ECDSA-AES256-SHA384', - )) diff --git a/kmip/services/server/config.py b/kmip/services/server/config.py index 3176038..c243c74 100644 --- a/kmip/services/server/config.py +++ b/kmip/services/server/config.py @@ -35,6 +35,7 @@ class KmipServerConfig(object): self.settings = dict() self.settings['enable_tls_client_auth'] = True + self.settings['tls_cipher_suites'] = [] self._expected_settings = [ 'hostname', @@ -46,7 +47,8 @@ class KmipServerConfig(object): ] self._optional_settings = [ 'policy_path', - 'enable_tls_client_auth' + 'enable_tls_client_auth', + 'tls_cipher_suites' ] def set_setting(self, setting, value): @@ -84,8 +86,10 @@ class KmipServerConfig(object): self._set_auth_suite(value) elif setting == 'policy_path': self._set_policy_path(value) - else: + elif setting == 'enable_tls_client_auth': self._set_enable_tls_client_auth(value) + else: + self._set_tls_cipher_suites(value) def load_settings(self, path): """ @@ -156,6 +160,10 @@ class KmipServerConfig(object): self._set_enable_tls_client_auth( parser.getboolean('server', 'enable_tls_client_auth') ) + if parser.has_option('server', 'tls_cipher_suites'): + self._set_tls_cipher_suites( + parser.get('server', 'tls_cipher_suites') + ) def _set_hostname(self, value): if isinstance(value, six.string_types): @@ -261,3 +269,24 @@ class KmipServerConfig(object): "The flag enabling the TLS certificate client auth flag check " "must be a boolean." ) + + def _set_tls_cipher_suites(self, value): + if not value: + self.settings['tls_cipher_suites'] = [] + return + if isinstance(value, six.string_types): + value = value.split() + + if isinstance(value, list): + for entry in value: + if not isinstance(entry, six.string_types): + raise exceptions.ConfigurationError( + "The TLS cipher suites must be a set of strings " + "representing cipher suite names." + ) + self.settings['tls_cipher_suites'] = list(set(value)) + else: + raise exceptions.ConfigurationError( + "The TLS cipher suites must be a set of strings representing " + "cipher suite names." + ) diff --git a/kmip/services/server/server.py b/kmip/services/server/server.py index 6acfee0..a0cdf26 100644 --- a/kmip/services/server/server.py +++ b/kmip/services/server/server.py @@ -51,7 +51,8 @@ class KmipServer(object): config_path='/etc/pykmip/server.conf', log_path='/var/log/pykmip/server.log', policy_path=None, - enable_tls_client_auth=None + enable_tls_client_auth=None, + tls_cipher_suites=None ): """ Create a KmipServer. @@ -100,6 +101,12 @@ class KmipServer(object): certificate client auth flag should be required for client certificates when establishing a new client session. Optional, defaults to None. + tls_cipher_suites (string): A comma-delimited list of cipher suite + names (e.g., TLS_RSA_WITH_AES_256_CBC_SHA256,TLS_RSA_WITH_AES_ + 128_CBC_SHA256), indicating which specific cipher suites should + be used by the server when establishing a TLS connection with + a client. Optional, defaults to None. If None, the default set + of TLS cipher suites will be used. """ self._logger = logging.getLogger('kmip.server') self._setup_logging(log_path) @@ -114,13 +121,15 @@ class KmipServer(object): ca_path, auth_suite, policy_path, - enable_tls_client_auth + enable_tls_client_auth, + tls_cipher_suites ) + cipher_suites = self.config.settings.get('tls_cipher_suites') if self.config.settings.get('auth_suite') == 'TLS1.2': - self.auth_suite = auth.TLS12AuthenticationSuite() + self.auth_suite = auth.TLS12AuthenticationSuite(cipher_suites) else: - self.auth_suite = auth.BasicAuthenticationSuite() + self.auth_suite = auth.BasicAuthenticationSuite(cipher_suites) self._engine = engine.KmipEngine( self.config.settings.get('policy_path') @@ -147,7 +156,7 @@ class KmipServer(object): ) ) self._logger.addHandler(handler) - self._logger.setLevel(logging.INFO) + self._logger.setLevel(logging.DEBUG) def _setup_configuration( self, @@ -159,7 +168,8 @@ class KmipServer(object): ca_path=None, auth_suite=None, policy_path=None, - enable_tls_client_auth=None + enable_tls_client_auth=None, + tls_cipher_suites=None ): if path: self.config.load_settings(path) @@ -183,6 +193,11 @@ class KmipServer(object): 'enable_tls_client_auth', enable_tls_client_auth ) + if tls_cipher_suites: + self.config.set_setting( + 'tls_cipher_suites', + tls_cipher_suites.split(',') + ) def start(self): """ @@ -202,6 +217,22 @@ class KmipServer(object): self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._logger.debug( + "Configured cipher suites: {0}".format( + len(self.config.settings.get('tls_cipher_suites')) + ) + ) + for cipher in self.config.settings.get('tls_cipher_suites'): + self._logger.debug(cipher) + auth_suite_ciphers = self.auth_suite.ciphers.split(':') + self._logger.debug( + "Authentication suite ciphers to use: {0}".format( + len(auth_suite_ciphers) + ) + ) + for cipher in auth_suite_ciphers: + self._logger.debug(cipher) + self._socket = ssl.wrap_socket( self._socket, keyfile=self.config.settings.get('key_path'), diff --git a/kmip/services/server/session.py b/kmip/services/server/session.py index 627ffcd..4ffb361 100644 --- a/kmip/services/server/session.py +++ b/kmip/services/server/session.py @@ -158,6 +158,17 @@ class KmipSession(threading.Thread): max_size = self._max_response_size try: + shared_ciphers = self._connection.shared_ciphers() + self._logger.debug( + "Possible session ciphers: {0}".format(len(shared_ciphers)) + ) + for cipher in shared_ciphers: + self._logger.debug(cipher) + self._logger.debug( + "Session cipher selected: {0}".format( + self._connection.cipher() + ) + ) client_identity = self._get_client_identity() request.read(request_data) except Exception as e: diff --git a/kmip/tests/unit/services/server/test_config.py b/kmip/tests/unit/services/server/test_config.py index e882200..90c669e 100644 --- a/kmip/tests/unit/services/server/test_config.py +++ b/kmip/tests/unit/services/server/test_config.py @@ -38,7 +38,16 @@ class TestKmipServerConfig(testtools.TestCase): """ Test that a KmipServerConfig object can be created without error. """ - config.KmipServerConfig() + c = config.KmipServerConfig() + + self.assertIn('enable_tls_client_auth', c.settings.keys()) + self.assertEqual( + True, + c.settings.get('enable_tls_client_auth') + ) + + self.assertIn('tls_cipher_suites', c.settings.keys()) + self.assertEqual([], c.settings.get('tls_cipher_suites')) def test_set_setting(self): """ @@ -55,6 +64,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_port = mock.MagicMock() c._set_policy_path = mock.MagicMock() c._set_enable_tls_client_auth = mock.MagicMock() + c._set_tls_cipher_suites = mock.MagicMock() # Test the right error is generated when setting an unsupported # setting. @@ -96,6 +106,9 @@ class TestKmipServerConfig(testtools.TestCase): c.set_setting('enable_tls_client_auth', False) c._set_enable_tls_client_auth.assert_called_once_with(False) + c.set_setting('tls_cipher_suites', []) + c._set_tls_cipher_suites.assert_called_once_with([]) + def test_load_settings(self): """ Test that the right calls are made and the right errors generated when @@ -149,6 +162,7 @@ class TestKmipServerConfig(testtools.TestCase): c._set_port = mock.MagicMock() c._set_policy_path = mock.MagicMock() c._set_enable_tls_client_auth = mock.MagicMock() + c._set_tls_cipher_suites = mock.MagicMock() # Test that the right calls are made when correctly parsing settings. parser = configparser.SafeConfigParser() @@ -161,6 +175,11 @@ class TestKmipServerConfig(testtools.TestCase): parser.set('server', 'auth_suite', 'Basic') parser.set('server', 'policy_path', '/test/path/policies') parser.set('server', 'enable_tls_client_auth', 'False') + parser.set( + 'server', + 'tls_cipher_suites', + "\n TLS_RSA_WITH_AES_256_CBC_SHA256" + ) c._parse_settings(parser) @@ -174,6 +193,9 @@ class TestKmipServerConfig(testtools.TestCase): c._set_auth_suite.assert_called_once_with('Basic') c._set_policy_path.assert_called_once_with('/test/path/policies') c._set_enable_tls_client_auth.assert_called_once_with(False) + c._set_tls_cipher_suites.assert_called_once_with( + "\n TLS_RSA_WITH_AES_256_CBC_SHA256" + ) # Test that a ConfigurationError is generated when the expected # section is missing. @@ -536,12 +558,6 @@ class TestKmipServerConfig(testtools.TestCase): c = config.KmipServerConfig() c._logger = mock.MagicMock() - self.assertIn('enable_tls_client_auth', c.settings.keys()) - self.assertEqual( - True, - c.settings.get('enable_tls_client_auth') - ) - # Test that the setting is set correctly with a valid value c._set_enable_tls_client_auth(False) self.assertEqual( @@ -571,3 +587,102 @@ class TestKmipServerConfig(testtools.TestCase): c._set_enable_tls_client_auth, *args ) + + def test_set_tls_cipher_suites(self): + """ + Test that the tls_cipher_suites configuration property can be set + correctly with a value expected from the config file. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + c._set_tls_cipher_suites( + """ + TLS_RSA_WITH_AES_256_CBC_SHA256 + TLS_RSA_WITH_AES_128_CBC_SHA256 +""" + ) + self.assertEqual(2, len(c.settings.get('tls_cipher_suites'))) + self.assertIn( + 'TLS_RSA_WITH_AES_256_CBC_SHA256', + c.settings.get('tls_cipher_suites') + ) + self.assertIn( + 'TLS_RSA_WITH_AES_128_CBC_SHA256', + c.settings.get('tls_cipher_suites') + ) + + def test_set_tls_cipher_suites_preparsed(self): + """ + Test that the tls_cipher_suites configuration property can be set + correctly with a preparsed list of TLS cipher suites, the value + expected if the cipher suites were provided via constructor. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + c._set_tls_cipher_suites( + [ + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA', + 'TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384', + 'DH-DSS-AES128-SHA' + ] + ) + self.assertEqual(3, len(c.settings.get('tls_cipher_suites'))) + self.assertIn( + 'DH-DSS-AES128-SHA', + c.settings.get('tls_cipher_suites') + ) + self.assertIn( + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA', + c.settings.get('tls_cipher_suites') + ) + self.assertIn( + 'TLS_ECDH_RSA_WITH_AES_256_CBC_SHA384', + c.settings.get('tls_cipher_suites') + ) + + def test_set_tls_cipher_suites_empty(self): + """ + Test that the tls_cipher_suites configuration property can be set + correctly with an empty value. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + c._set_tls_cipher_suites(None) + self.assertEqual([], c.settings.get('tls_cipher_suites')) + + def test_set_tls_cipher_suites_invalid_value(self): + """ + Test that the right error is raised when an invalid value is used to + set the tls_cipher_suites configuration property. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + args = (1,) + self.assertRaisesRegexp( + exceptions.ConfigurationError, + "The TLS cipher suites must be a set of strings representing " + "cipher suite names.", + c._set_tls_cipher_suites, + *args + ) + + def test_set_tls_cipher_suites_invalid_list_value(self): + """ + Test that the right error is raised when an invalid list value is used + to set the tls_cipher_suites configuration property. + """ + c = config.KmipServerConfig() + c._logger = mock.MagicMock() + + args = ([0],) + self.assertRaisesRegexp( + exceptions.ConfigurationError, + "The TLS cipher suites must be a set of strings representing " + "cipher suite names.", + c._set_tls_cipher_suites, + *args + ) diff --git a/kmip/tests/unit/services/server/test_server.py b/kmip/tests/unit/services/server/test_server.py index 4376b6d..993175a 100644 --- a/kmip/tests/unit/services/server/test_server.py +++ b/kmip/tests/unit/services/server/test_server.py @@ -95,7 +95,7 @@ class TestKmipServer(testtools.TestCase): open_mock.assert_called_once_with('/test/path/server.log', 'w') self.assertTrue(s._logger.addHandler.called) - s._logger.setLevel.assert_called_once_with(logging.INFO) + s._logger.setLevel.assert_called_once_with(logging.DEBUG) @mock.patch('kmip.services.server.engine.KmipEngine') @mock.patch('kmip.services.auth.TLS12AuthenticationSuite') @@ -120,7 +120,8 @@ class TestKmipServer(testtools.TestCase): '/etc/pykmip/certs/ca.crt', 'Basic', '/etc/pykmip/policies', - False + False, + 'TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA' ) s.config.load_settings.assert_called_with('/etc/pykmip/server.conf') @@ -147,6 +148,13 @@ class TestKmipServer(testtools.TestCase): 'enable_tls_client_auth', False ) + s.config.set_setting.assert_any_call( + 'tls_cipher_suites', + [ + 'TLS_RSA_WITH_AES_128_CBC_SHA', + 'TLS_RSA_WITH_AES_256_CBC_SHA' + ] + ) # Test that an attempt is made to instantiate the TLS 1.2 auth suite s = server.KmipServer( @@ -170,8 +178,10 @@ class TestKmipServer(testtools.TestCase): s = server.KmipServer( hostname='127.0.0.1', port=5696, + auth_suite='Basic', config_path=None, - policy_path=None + policy_path=None, + tls_cipher_suites='TLS_RSA_WITH_AES_128_CBC_SHA' ) s._logger = mock.MagicMock() @@ -188,6 +198,13 @@ class TestKmipServer(testtools.TestCase): s._logger.info.assert_any_call( "Starting server socket handler." ) + s._logger.debug.assert_any_call("Configured cipher suites: 1") + s._logger.debug.assert_any_call("TLS_RSA_WITH_AES_128_CBC_SHA") + s._logger.debug.assert_any_call( + "Authentication suite ciphers to use: 1" + ) + s._logger.debug.assert_any_call("AES128-SHA") + socket_mock.assert_called_once_with( socket.AF_INET, socket.SOCK_STREAM diff --git a/kmip/tests/unit/services/server/test_session.py b/kmip/tests/unit/services/server/test_session.py index 71bccac..794dad4 100644 --- a/kmip/tests/unit/services/server/test_session.py +++ b/kmip/tests/unit/services/server/test_session.py @@ -365,6 +365,15 @@ class TestKmipSession(testtools.TestCase): ) kmip_session._logger = mock.MagicMock() kmip_session._connection = mock.MagicMock() + kmip_session._connection.shared_ciphers = mock.MagicMock( + return_value=[ + ('AES128-SHA256', 'TLSv1/SSLv3', 128), + ('AES256-SHA256', 'TLSv1/SSLv3', 256) + ] + ) + kmip_session._connection.cipher = mock.MagicMock( + return_value=('AES128-SHA256', 'TLSv1/SSLv3', 128) + ) kmip_session._receive_request = mock.MagicMock(return_value=data) kmip_session._send_response = mock.MagicMock() @@ -372,6 +381,20 @@ class TestKmipSession(testtools.TestCase): kmip_session._receive_request.assert_called_once_with() kmip_session._logger.info.assert_not_called() + kmip_session._logger.debug.assert_any_call( + "Possible session ciphers: 2" + ) + kmip_session._logger.debug.assert_any_call( + ('AES128-SHA256', 'TLSv1/SSLv3', 128) + ) + kmip_session._logger.debug.assert_any_call( + ('AES256-SHA256', 'TLSv1/SSLv3', 256) + ) + kmip_session._logger.debug.assert_any_call( + "Session cipher selected: {0}".format( + ('AES128-SHA256', 'TLSv1/SSLv3', 128) + ) + ) kmip_session._logger.warning.assert_not_called() kmip_session._logger.exception.assert_not_called() self.assertTrue(kmip_session._send_response.called) @@ -425,7 +448,7 @@ class TestKmipSession(testtools.TestCase): kmip_session._handle_message_loop() kmip_session._receive_request.assert_called_once_with() - kmip_session._logger.info.assert_not_called() +# kmip_session._logger.info.assert_not_called() self.assertTrue(kmip_session._logger.warning.called) kmip_session._logger.exception.assert_not_called() self.assertTrue(kmip_session._send_response.called) @@ -456,7 +479,7 @@ class TestKmipSession(testtools.TestCase): kmip_session._handle_message_loop() kmip_session._receive_request.assert_called_once_with() - kmip_session._logger.info.assert_not_called() +# kmip_session._logger.info.assert_not_called() kmip_session._logger.warning.assert_called_once_with( "An unexpected error occurred while processing request." ) diff --git a/kmip/tests/unit/services/test_auth.py b/kmip/tests/unit/services/test_auth.py index 797bdc2..92ec1f2 100644 --- a/kmip/tests/unit/services/test_auth.py +++ b/kmip/tests/unit/services/test_auth.py @@ -64,6 +64,60 @@ class TestBasicAuthenticationSuite(testtools.TestCase): self.assertEqual(cipher_string, ciphers) + def test_custom_ciphers(self): + """ + Test that providing a custom list of cipher suites yields the right + cipher string for the Basic auth suite. + """ + suite = auth.BasicAuthenticationSuite( + [ + 'TLS_RSA_WITH_AES_128_CBC_SHA', + 'TLS_RSA_WITH_AES_256_CBC_SHA', + 'TLS_DHE_PSK_WITH_AES_128_CBC_SHA', + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA', + 'DHE-DSS-AES256-SHA', + 'DHE-RSA-AES256-SHA' + ] + ) + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + suites = ciphers.split(':') + self.assertEqual(4, len(suites)) + self.assertIn('AES128-SHA', suites) + self.assertIn('AES256-SHA', suites) + self.assertIn('DHE-DSS-AES256-SHA', suites) + self.assertIn('DHE-RSA-AES256-SHA', suites) + + def test_custom_ciphers_empty(self): + """ + Test that providing a custom list of cipher suites that ultimately + yields an empty suite list causes the default cipher suite list to + be provided instead. + """ + suite = auth.BasicAuthenticationSuite( + [ + 'TLS_RSA_WITH_AES_256_CBC_SHA256' + ] + ) + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + suites = ciphers.split(':') + self.assertEqual(12, len(suites)) + self.assertIn('AES128-SHA', suites) + self.assertIn('DES-CBC3-SHA', suites) + self.assertIn('AES256-SHA', suites) + self.assertIn('DHE-DSS-DES-CBC3-SHA', suites) + self.assertIn('DHE-RSA-DES-CBC3-SHA', suites) + self.assertIn('DH-DSS-AES128-SHA', suites) + self.assertIn('DH-RSA-AES128-SHA', suites) + self.assertIn('DHE-DSS-AES128-SHA', suites) + self.assertIn('DHE-RSA-AES128-SHA', suites) + self.assertIn('DH-RSA-AES256-SHA', suites) + self.assertIn('DHE-DSS-AES256-SHA', suites) + self.assertIn('DHE-RSA-AES256-SHA', suites) + @pytest.mark.skipif(not hasattr(ssl, 'PROTOCOL_TLSv1_2'), reason="Requires ssl.PROTOCOL_TLSv1_2") @@ -121,3 +175,65 @@ class TestTLS12AuthenticationSuite(testtools.TestCase): )) self.assertEqual(cipher_string, ciphers) + + def test_custom_ciphers(self): + """ + Test that providing a custom list of cipher suites yields the right + cipher string. + """ + suite = auth.TLS12AuthenticationSuite( + [ + 'TLS_RSA_WITH_AES_256_CBC_SHA256', + 'TLS_RSA_WITH_AES_256_CBC_SHA', + 'TLS_DHE_PSK_WITH_AES_128_CBC_SHA', + 'TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA', + 'DHE-DSS-AES256-SHA', + 'DHE-RSA-AES256-SHA' + ] + ) + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + suites = ciphers.split(':') + self.assertEqual(1, len(suites)) + self.assertIn('AES256-SHA256', suites) + + def test_custom_ciphers_empty(self): + """ + Test that providing a custom list of cipher suites that ultimately + yields an empty suite list causes the default cipher suite list to + be provided instead. + """ + suite = auth.TLS12AuthenticationSuite( + [ + 'TLS_RSA_WITH_AES_256_CBC_SHA' + ] + ) + ciphers = suite.ciphers + + self.assertIsInstance(ciphers, str) + suites = ciphers.split(':') + self.assertEqual(23, len(suites)) + self.assertIn('AES128-SHA256', suites) + self.assertIn('AES256-SHA256', suites) + self.assertIn('DH-DSS-AES256-SHA256', suites) + self.assertIn('DH-DSS-AES128-SHA256', suites) + self.assertIn('DH-RSA-AES128-SHA256', suites) + self.assertIn('DHE-DSS-AES128-SHA256', suites) + self.assertIn('DHE-RSA-AES128-SHA256', suites) + self.assertIn('DH-DSS-AES256-SHA256', suites) + self.assertIn('DH-RSA-AES256-SHA256', suites) + self.assertIn('DHE-DSS-AES256-SHA256', suites) + self.assertIn('DHE-RSA-AES256-SHA256', suites) + self.assertIn('ECDH-ECDSA-AES128-SHA256', suites) + self.assertIn('ECDH-ECDSA-AES256-SHA256', suites) + self.assertIn('ECDHE-ECDSA-AES128-SHA256', suites) + self.assertIn('ECDHE-ECDSA-AES256-SHA384', suites) + self.assertIn('ECDH-RSA-AES128-SHA256', suites) + self.assertIn('ECDH-RSA-AES256-SHA384', suites) + self.assertIn('ECDHE-RSA-AES128-SHA256', suites) + self.assertIn('ECDHE-RSA-AES256-SHA384', suites) + self.assertIn('ECDHE-ECDSA-AES128-GCM-SHA256', suites) + self.assertIn('ECDHE-ECDSA-AES256-GCM-SHA384', suites) + self.assertIn('ECDHE-ECDSA-AES128-SHA256', suites) + self.assertIn('ECDHE-ECDSA-AES256-SHA384', suites)