1
0
mirror of https://github.com/thib8956/nginx-proxy synced 2025-02-24 17:58:16 +00:00
nginx-proxy/test/test_ssl/test_dhparam.py
polarathene e81001615c tests: Improve error handling output and use regular string comparisons
As this project isn't exactly python focused apart from the test suite, I'll assume other contributors are probably not as experienced with python either. Since this is a rather technical test, the extra comments should help grok the functionality without floundering around with the docs.

When the subprocess raises an exception due to an issue with the command (_eg using `-CAfile` arg to `openssl` with an invalid path_), the tests would output large walls of text that wasn't particularly helpful in troubleshooting the issue. `stderr` was also leaking out inbetween the test case results in the terminal, this has been resolved by ensuring that output is caught and piped, which keeps it available to python when an exception is raised. Identifying the actual error cause and location is now much nicer.

Updated the output to be plain string content instead of byte strings, this works fine :)
2021-09-28 21:59:53 +13:00

197 lines
7.3 KiB
Python

import re
import subprocess
import backoff
import docker
import pytest
docker_client = docker.from_env()
###############################################################################
#
# Tests helpers
#
###############################################################################
@backoff.on_exception(backoff.constant, AssertionError, interval=2, max_tries=15, jitter=None)
def assert_log_contains(expected_log_line, container_name="nginxproxy"):
"""
Check that the nginx-proxy container log contains a given string.
The backoff decorator will retry the check 15 times with a 2 seconds delay.
:param expected_log_line: string to search for
:return: None
:raises: AssertError if the expected string is not found in the log
"""
sut_container = docker_client.containers.get(container_name)
docker_logs = sut_container.logs(stdout=True, stderr=True, stream=False, follow=False)
assert bytes(expected_log_line, encoding="utf8") in docker_logs
def require_openssl(required_version):
"""
This function checks that the required version of OpenSSL is present, and skips the test if not.
Use it as a test function decorator:
@require_openssl("2.3.4")
def test_something():
...
:param required_version: minimal required version as a string: "1.2.3"
"""
def versiontuple(v):
clean_v = re.sub(r"[^\d\.]", "", v)
return tuple(map(int, (clean_v.split("."))))
try:
command_output = subprocess.check_output(["openssl", "version"])
except OSError:
return pytest.mark.skip("openssl command is not available in test environment")
else:
if not command_output:
raise Exception("Could not get openssl version")
openssl_version = str(command_output.split()[1])
return pytest.mark.skipif(
versiontuple(openssl_version) < versiontuple(required_version),
reason=f"openssl v{openssl_version} is less than required version {required_version}")
@require_openssl("1.0.2")
def negotiate_cipher(sut_container, additional_params='', grep='Cipher is'):
host = f"{sut_container.attrs['NetworkSettings']['IPAddress']}:443"
try:
# Enforce TLS 1.2 as newer versions don't support custom dhparam or ciphersuite preference.
# The empty `echo` is to provide `openssl` user input, so that the process exits: https://stackoverflow.com/a/28567565
# `shell=True` enables using a single string to execute as a shell command.
# `text=True` prevents the need to compare against byte strings.
# `stderr=subprocess.PIPE` removes the output to stderr being interleaved with test case status (output during exceptions).
return subprocess.check_output(
f"echo '' | openssl s_client -connect {host} -tls1_2 {additional_params} | grep '{grep}'",
shell=True,
text=True,
stderr=subprocess.PIPE,
)
except subprocess.CalledProcessError as e:
# Output a more helpful error, the original exception in this case isn't that helpful.
# `from None` to ignore undesired output from exception chaining.
raise Exception("Failed to process CLI request:\n" + e.stderr) from None
def can_negotiate_dhe_ciphersuite(sut_container):
r = negotiate_cipher(sut_container, "-cipher 'EDH'")
assert "New, TLSv1.2, Cipher is DHE-RSA-AES256-GCM-SHA384\n" == r
r2 = negotiate_cipher(sut_container, "-cipher 'EDH'", "Server Temp Key")
assert "DH" in r2
def cannot_negotiate_dhe_ciphersuite(sut_container):
# Fail to negotiate a DHE cipher suite:
r = negotiate_cipher(sut_container, "-cipher 'EDH'")
assert "New, (NONE), Cipher is (NONE)\n" == r
# Correctly establish a connection (TLS 1.2):
r2 = negotiate_cipher(sut_container)
assert "New, TLSv1.2, Cipher is ECDHE-RSA-AES256-GCM-SHA384\n" == r2
r3 = negotiate_cipher(sut_container, grep="Server Temp Key")
assert "X25519" in r3
# Parse array of container ENV, splitting at the `=` and returning the value, otherwise `None`
def get_env(sut_container, var):
env = sut_container.attrs['Config']['Env']
for e in env:
if e.startswith(var):
return e.split('=')[1]
return None
###############################################################################
#
# Tests
#
###############################################################################
def test_default_dhparam_is_ffdhe4096(docker_compose):
container_name="dh-default"
sut_container = docker_client.containers.get(container_name)
assert sut_container.status == "running"
assert_log_contains("Setting up DH Parameters..", container_name)
# Make sure the dhparam file used is the default ffdhe4096.pem:
default_checksum = sut_container.exec_run("md5sum /app/dhparam/ffdhe4096.pem").output.split()
current_checksum = sut_container.exec_run("md5sum /etc/nginx/dhparam/dhparam.pem").output.split()
assert default_checksum[0] == current_checksum[0]
can_negotiate_dhe_ciphersuite(sut_container)
def test_can_change_dhparam_group(docker_compose):
container_name="dh-env"
sut_container = docker_client.containers.get(container_name)
assert sut_container.status == "running"
assert_log_contains("Setting up DH Parameters..", container_name)
# Make sure the dhparam file used is ffdhe2048.pem, not the default (ffdhe4096.pem):
default_checksum = sut_container.exec_run("md5sum /app/dhparam/ffdhe2048.pem").output.split()
current_checksum = sut_container.exec_run("md5sum /etc/nginx/dhparam/dhparam.pem").output.split()
assert default_checksum[0] == current_checksum[0]
can_negotiate_dhe_ciphersuite(sut_container)
def test_fail_if_dhparam_group_not_supported(docker_compose):
container_name="invalid-group-1024"
sut_container = docker_client.containers.get(container_name)
assert sut_container.status == "exited"
DHPARAM_BITS = get_env(sut_container, "DHPARAM_BITS")
assert DHPARAM_BITS == "1024"
assert_log_contains(
f"ERROR: Unsupported DHPARAM_BITS size: {DHPARAM_BITS}. Use: 2048, 3072, or 4096 (default).",
container_name
)
def test_custom_dhparam_is_supported(docker_compose):
container_name="dh-file"
sut_container = docker_client.containers.get(container_name)
assert sut_container.status == "running"
assert_log_contains(
"Warning: A custom dhparam.pem file was provided. Best practice is to use standardized RFC7919 DHE groups instead.",
container_name
)
# Make sure the dhparam file used is not the default (ffdhe4096.pem):
default_checksum = sut_container.exec_run("md5sum /app/dhparam/ffdhe4096.pem").output.split()
current_checksum = sut_container.exec_run("md5sum /etc/nginx/dhparam/dhparam.pem").output.split()
assert default_checksum[0] != current_checksum[0]
can_negotiate_dhe_ciphersuite(sut_container)
def test_can_skip_dhparam(docker_compose):
container_name="dh-skip"
sut_container = docker_client.containers.get(container_name)
assert sut_container.status == "running"
assert_log_contains("Skipping Diffie-Hellman parameters setup.", container_name)
cannot_negotiate_dhe_ciphersuite(sut_container)
def test_web5_https_works(docker_compose, nginxproxy):
r = nginxproxy.get("https://web5.nginx-proxy.tld/port", allow_redirects=False)
assert r.status_code == 200
assert "answer from port 85\n" in r.text