File: //snap/certbot/5057/lib/python3.12/site-packages/acme/_internal/tests/client_test.py
"""Tests for acme.client."""
# pylint: disable=too-many-lines
import copy
import datetime
import http.client as http_client
import json
import sys
import unittest
from unittest import mock
from cryptography import x509
import josepy as jose
import pytest
import requests
from acme import challenges
from acme import errors
from acme import jws as acme_jws
from acme import messages
from acme._internal.tests import messages_test
from acme._internal.tests import test_util
from acme.client import ClientNetwork
from acme.client import ClientV2
CERT_SAN_PEM = test_util.load_vector('cert-san.pem')
CSR_MIXED_PEM = test_util.load_vector('csr-mixed.pem')
CSR_NO_SANS_PEM = test_util.load_vector('csr-nosans.pem')
KEY = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
DIRECTORY_V2 = messages.Directory({
'newAccount': 'https://www.letsencrypt-demo.org/acme/new-account',
'newNonce': 'https://www.letsencrypt-demo.org/acme/new-nonce',
'newOrder': 'https://www.letsencrypt-demo.org/acme/new-order',
'revokeCert': 'https://www.letsencrypt-demo.org/acme/revoke-cert',
'meta': messages.Directory.Meta(),
})
class ClientV2Test(unittest.TestCase):
"""Tests for acme.client.ClientV2."""
def setUp(self):
self.response = mock.MagicMock(
ok=True, status_code=http_client.OK, headers={}, links={})
self.net = mock.MagicMock()
self.net.post.return_value = self.response
self.net.get.return_value = self.response
self.identifier = messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='example.com')
# Registration
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
reg = messages.Registration(
contact=self.contact, key=KEY.public_key())
the_arg: dict = dict(reg)
self.new_reg = messages.NewRegistration(**the_arg)
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1')
# Authorization
authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1'
challb = messages.ChallengeBody(
uri=(authzr_uri + '/1'), status=messages.STATUS_VALID,
chall=challenges.DNS(token=jose.b64decode(
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA')))
self.challr = messages.ChallengeResource(
body=challb, authzr_uri=authzr_uri)
self.authz = messages.Authorization(
identifier=messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='example.com'),
challenges=(challb,))
self.authzr = messages.AuthorizationResource(
body=self.authz, uri=authzr_uri)
# Reason code for revocation
self.rsn = 1
self.directory = DIRECTORY_V2
self.client = ClientV2(self.directory, self.net)
self.new_reg = self.new_reg.update(terms_of_service_agreed=True)
self.authzr_uri2 = 'https://www.letsencrypt-demo.org/acme/authz/2'
self.authz2 = self.authz.update(identifier=messages.Identifier(
typ=messages.IDENTIFIER_FQDN, value='www.example.com'),
status=messages.STATUS_PENDING)
self.authzr2 = messages.AuthorizationResource(
body=self.authz2, uri=self.authzr_uri2)
self.order = messages.Order(
identifiers=(self.authz.identifier, self.authz2.identifier),
status=messages.STATUS_PENDING,
authorizations=(self.authzr.uri, self.authzr_uri2),
finalize='https://www.letsencrypt-demo.org/acme/acct/1/order/1/finalize')
self.orderr = messages.OrderResource(
body=self.order,
uri='https://www.letsencrypt-demo.org/acme/acct/1/order/1',
authorizations=[self.authzr, self.authzr2], csr_pem=CSR_MIXED_PEM)
self.orderr2 = messages.OrderResource(
body=self.order,
uri='https://www.letsencrypt-demo.org/acme/acct/1/order/1',
authorizations=[self.authzr, self.authzr2], csr_pem=CSR_NO_SANS_PEM)
def test_new_account(self):
self.response.status_code = http_client.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = self.regr.uri
assert self.regr == self.client.new_account(self.new_reg)
def test_new_account_tos_link(self):
self.response.status_code = http_client.CREATED
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = self.regr.uri
self.response.links.update({
'terms-of-service': {'url': 'https://www.letsencrypt-demo.org/tos'},
})
assert self.client.new_account(self.new_reg).terms_of_service == \
'https://www.letsencrypt-demo.org/tos'
def test_new_account_conflict(self):
self.response.status_code = http_client.OK
self.response.headers['Location'] = self.regr.uri
with pytest.raises(errors.ConflictError):
self.client.new_account(self.new_reg)
def test_deactivate_account(self):
deactivated_regr = self.regr.update(
body=self.regr.body.update(status='deactivated'))
self.response.json.return_value = deactivated_regr.body.to_json()
self.response.status_code = http_client.OK
self.response.headers['Location'] = self.regr.uri
assert self.client.deactivate_registration(self.regr) == deactivated_regr
def test_deactivate_authorization(self):
deactivated_authz = self.authzr.update(
body=self.authzr.body.update(status=messages.STATUS_DEACTIVATED))
self.response.json.return_value = deactivated_authz.body.to_json()
authzr = self.client.deactivate_authorization(self.authzr)
assert deactivated_authz.body == authzr.body
assert self.client.net.post.call_count == 1
assert self.authzr.uri in self.net.post.call_args_list[0][0]
def test_new_order(self):
order_response = copy.deepcopy(self.response)
order_response.status_code = http_client.CREATED
order_response.json.return_value = self.order.to_json()
order_response.headers['Location'] = self.orderr.uri
self.net.post.return_value = order_response
authz_response = copy.deepcopy(self.response)
authz_response.json.return_value = self.authz.to_json()
authz_response.headers['Location'] = self.authzr.uri
authz_response2 = self.response
authz_response2.json.return_value = self.authz2.to_json()
authz_response2.headers['Location'] = self.authzr2.uri
with mock.patch('acme.client.ClientV2._post_as_get') as mock_post_as_get:
mock_post_as_get.side_effect = (authz_response, authz_response2)
assert self.client.new_order(CSR_MIXED_PEM) == self.orderr
with mock.patch('acme.client.ClientV2._post_as_get') as mock_post_as_get:
mock_post_as_get.side_effect = (authz_response, authz_response2)
assert self.client.new_order(CSR_NO_SANS_PEM) == self.orderr2
def test_answer_challege(self):
self.response.links['up'] = {'url': self.challr.authzr_uri}
self.response.json.return_value = self.challr.body.to_json()
chall_response = challenges.DNSResponse(validation=None)
self.client.answer_challenge(self.challr.body, chall_response)
with pytest.raises(errors.UnexpectedUpdate):
self.client.answer_challenge(self.challr.body.update(uri='foo'), chall_response)
def test_answer_challenge_missing_next(self):
with pytest.raises(errors.ClientError):
self.client.answer_challenge(self.challr.body, challenges.DNSResponse(validation=None))
@mock.patch('acme.client.datetime')
def test_poll_and_finalize(self, mock_datetime):
mock_datetime.datetime.now.return_value = datetime.datetime(2018, 2, 15)
mock_datetime.timedelta = datetime.timedelta
expected_deadline = mock_datetime.datetime.now() + datetime.timedelta(seconds=90)
self.client.poll_authorizations = mock.Mock(return_value=self.orderr)
self.client.finalize_order = mock.Mock(return_value=self.orderr)
assert self.client.poll_and_finalize(self.orderr) == self.orderr
self.client.poll_authorizations.assert_called_once_with(self.orderr, expected_deadline)
self.client.finalize_order.assert_called_once_with(self.orderr, expected_deadline)
@mock.patch('acme.client.datetime')
def test_poll_authorizations_timeout(self, mock_datetime):
now_side_effect = [datetime.datetime(2018, 2, 15),
datetime.datetime(2018, 2, 16),
datetime.datetime(2018, 2, 17)]
mock_datetime.datetime.now.side_effect = now_side_effect
self.response.json.side_effect = [
self.authz.to_json(), self.authz2.to_json(), self.authz2.to_json()]
with pytest.raises(errors.TimeoutError):
self.client.poll_authorizations(self.orderr, now_side_effect[1])
def test_poll_authorizations_failure(self):
deadline = datetime.datetime(9999, 9, 9)
challb = self.challr.body.update(status=messages.STATUS_INVALID,
error=messages.Error.with_code('unauthorized'))
authz = self.authz.update(status=messages.STATUS_INVALID, challenges=(challb,))
self.response.json.return_value = authz.to_json()
with pytest.raises(errors.ValidationError):
self.client.poll_authorizations(self.orderr, deadline)
def test_poll_authorizations_success(self):
deadline = datetime.datetime(9999, 9, 9)
updated_authz2 = self.authz2.update(status=messages.STATUS_VALID)
updated_authzr2 = messages.AuthorizationResource(
body=updated_authz2, uri=self.authzr_uri2)
updated_orderr = self.orderr.update(authorizations=[self.authzr, updated_authzr2])
self.response.json.side_effect = (
self.authz.to_json(), self.authz2.to_json(), updated_authz2.to_json())
assert self.client.poll_authorizations(self.orderr, deadline) == updated_orderr
def test_poll_unexpected_update(self):
updated_authz = self.authz.update(identifier=self.identifier.update(value='foo'))
self.response.json.return_value = updated_authz.to_json()
with pytest.raises(errors.UnexpectedUpdate):
self.client.poll(self.authzr)
def test_finalize_order_success(self):
updated_order = self.order.update(
certificate='https://www.letsencrypt-demo.org/acme/cert/',
status=messages.STATUS_VALID)
updated_orderr = self.orderr.update(body=updated_order, fullchain_pem=CERT_SAN_PEM)
self.response.json.return_value = updated_order.to_json()
self.response.text = CERT_SAN_PEM
deadline = datetime.datetime(9999, 9, 9)
assert self.client.finalize_order(self.orderr, deadline) == updated_orderr
def test_finalize_order_error(self):
updated_order = self.order.update(
error=messages.Error.with_code('unauthorized'),
status=messages.STATUS_INVALID)
self.response.json.return_value = updated_order.to_json()
deadline = datetime.datetime(9999, 9, 9)
with pytest.raises(errors.IssuanceError):
self.client.finalize_order(self.orderr, deadline)
@mock.patch('acme.client.ClientV2.begin_finalization')
def test_finalize_order_ready(self, mock_begin):
# https://github.com/certbot/certbot/issues/9766
updated_order_ready = self.order.update(status=messages.STATUS_READY)
updated_order_valid = self.order.update(
certificate='https://www.letsencrypt-demo.org/acme/cert/',
status=messages.STATUS_VALID)
updated_orderr = self.orderr.update(body=updated_order_valid, fullchain_pem=CERT_SAN_PEM)
self.response.text = CERT_SAN_PEM
self.response.json.side_effect = [updated_order_ready.to_json(),
updated_order_valid.to_json()]
deadline = datetime.datetime(9999, 9, 9)
assert self.client.finalize_order(self.orderr, deadline) == updated_orderr
assert self.response.json.call_count == 2
assert mock_begin.call_count == 2
def test_finalize_order_invalid_status(self):
# https://github.com/certbot/certbot/issues/9296
order = self.order.update(error=None, status=messages.STATUS_INVALID)
self.response.json.return_value = order.to_json()
with pytest.raises(errors.Error, match="The certificate order failed"):
self.client.finalize_order(self.orderr, datetime.datetime(9999, 9, 9))
@mock.patch('acme.client.time.sleep')
@mock.patch('acme.client.datetime')
def test_finalize_order_orderNotReady(self, dt_mock, mock_sleep):
# https://github.com/certbot/certbot/issues/9766
updated_order_processing = self.order.update(status=messages.STATUS_PROCESSING)
updated_order_ready = self.order.update(status=messages.STATUS_READY)
updated_order_valid = self.order.update(
certificate='https://www.letsencrypt-demo.org/acme/cert/',
status=messages.STATUS_VALID)
self.orderr.update(body=updated_order_valid, fullchain_pem=CERT_SAN_PEM)
self.response.text = CERT_SAN_PEM
self.response.json.side_effect = [updated_order_processing.to_json(),
updated_order_ready.to_json(),
updated_order_valid.to_json()]
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = '50'
post = mock.MagicMock()
post.side_effect = [messages.Error.with_code('orderNotReady'), # first begin_finalization
# sleep 1
self.response, # first poll_finalization poll --> returns processing
# retry-after sleep here
self.response, # second poll_finalization poll --> returns ready
mock.MagicMock(), # second begin_finalization
# sleep 1
self.response, # third poll_finalization poll --> returns valid
self.response # fetch cert
]
self.net.post = post
self.client.finalize_order(self.orderr, datetime.datetime(9999, 9, 9))
assert self.net.post.call_count == 6
assert mock_sleep.call_args_list == [((1,),), ((50,),), ((1,),)]
def test_finalize_order_otherErrorCode(self):
post = mock.MagicMock()
post.side_effect = [messages.Error.with_code('serverInternal')]
self.net.post = post
with pytest.raises(messages.Error):
self.client.finalize_order(self.orderr, datetime.datetime(9999, 9, 9))
def test_finalize_order_timeout(self):
deadline = datetime.datetime.now() - datetime.timedelta(seconds=60)
with pytest.raises(errors.TimeoutError):
self.client.finalize_order(self.orderr, deadline)
def test_finalize_order_alt_chains(self):
updated_order = self.order.update(
certificate='https://www.letsencrypt-demo.org/acme/cert/',
status=messages.STATUS_VALID
)
updated_orderr = self.orderr.update(body=updated_order,
fullchain_pem=CERT_SAN_PEM,
alternative_fullchains_pem=[CERT_SAN_PEM,
CERT_SAN_PEM])
self.response.json.return_value = updated_order.to_json()
self.response.text = CERT_SAN_PEM
self.response.headers['Link'] ='<https://example.com/acme/cert/1>;rel="alternate", ' + \
'<https://example.com/dir>;rel="index", ' + \
'<https://example.com/acme/cert/2>;title="foo";rel="alternate"'
deadline = datetime.datetime(9999, 9, 9)
resp = self.client.finalize_order(self.orderr, deadline, fetch_alternative_chains=True)
self.net.post.assert_any_call('https://example.com/acme/cert/1',
mock.ANY, new_nonce_url=mock.ANY)
self.net.post.assert_any_call('https://example.com/acme/cert/2',
mock.ANY, new_nonce_url=mock.ANY)
assert resp == updated_orderr
del self.response.headers['Link']
resp = self.client.finalize_order(self.orderr, deadline, fetch_alternative_chains=True)
assert resp == updated_orderr.update(alternative_fullchains_pem=[])
def test_revoke(self):
self.client.revoke(messages_test.CERT, self.rsn)
self.net.post.assert_called_once_with(
self.directory["revokeCert"], mock.ANY, new_nonce_url=DIRECTORY_V2['newNonce'])
def test_revoke_bad_status_raises_error(self):
self.response.status_code = http_client.METHOD_NOT_ALLOWED
with pytest.raises(errors.ClientError):
self.client.revoke(messages_test.CERT,
self.rsn)
def test_update_registration(self):
# "Instance of 'Field' has no to_json/update member" bug:
self.response.headers['Location'] = self.regr.uri
self.response.json.return_value = self.regr.body.to_json()
assert self.regr == self.client.update_registration(self.regr)
assert self.client.net.account is not None
assert self.client.net.post.call_count == 2
assert DIRECTORY_V2.newAccount in self.net.post.call_args_list[0][0]
self.response.json.return_value = self.regr.body.update(
contact=()).to_json()
def test_external_account_required_true(self):
self.client.directory = messages.Directory({
'meta': messages.Directory.Meta(external_account_required=True)
})
assert self.client.external_account_required()
def test_external_account_required_false(self):
self.client.directory = messages.Directory({
'meta': messages.Directory.Meta(external_account_required=False)
})
assert not self.client.external_account_required()
def test_external_account_required_default(self):
assert not self.client.external_account_required()
def test_query_registration_client(self):
self.response.json.return_value = self.regr.body.to_json()
self.response.headers['Location'] = 'https://www.letsencrypt-demo.org/acme/reg/1'
assert self.regr == self.client.query_registration(self.regr)
def test_post_as_get(self):
with mock.patch('acme.client.ClientV2._authzr_from_response') as mock_client:
mock_client.return_value = self.authzr2
self.client.poll(self.authzr2)
self.client.net.post.assert_called_once_with(
self.authzr2.uri, None,
new_nonce_url='https://www.letsencrypt-demo.org/acme/new-nonce')
self.client.net.get.assert_not_called()
def test_retry_after_date(self):
self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT'
assert datetime.datetime(1999, 12, 31, 23, 59, 59) == \
self.client.retry_after(response=self.response, default=10)
@mock.patch('acme.client.datetime')
def test_retry_after_invalid(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = 'foooo'
assert datetime.datetime(2015, 3, 27, 0, 0, 10) == \
self.client.retry_after(response=self.response, default=10)
@mock.patch('acme.client.datetime')
def test_retry_after_overflow(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
dt_mock.datetime.side_effect = datetime.datetime
self.response.headers['Retry-After'] = "Tue, 116 Feb 2016 11:50:00 MST"
assert datetime.datetime(2015, 3, 27, 0, 0, 10) == \
self.client.retry_after(response=self.response, default=10)
@mock.patch('acme.client.datetime')
def test_retry_after_seconds(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = '50'
assert datetime.datetime(2015, 3, 27, 0, 0, 50) == \
self.client.retry_after(response=self.response, default=10)
@mock.patch('acme.client.datetime')
def test_retry_after_missing(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27)
dt_mock.timedelta = datetime.timedelta
assert datetime.datetime(2015, 3, 27, 0, 0, 10) == \
self.client.retry_after(response=self.response, default=10)
def test_get_directory(self):
self.response.json.return_value = DIRECTORY_V2.to_json()
assert DIRECTORY_V2.to_partial_json() == \
ClientV2.get_directory('https://example.com/dir', self.net).to_partial_json()
@mock.patch('acme.client.datetime')
def test_renewal_time_expired_cert(self, dt_mock):
utc_now = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
dt_mock.datetime.now.return_value = utc_now
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 20, 00, 00, 00),
)
cert = x509.load_pem_x509_certificate(cert_pem)
t, _ = self.client.renewal_time(cert_pem)
assert t == cert.not_valid_after_utc
@mock.patch('acme.client.datetime')
def test_renewal_time_no_renewal_info(self, dt_mock):
utc_now = datetime.datetime(2025, 3, 15, tzinfo=datetime.timezone.utc)
dt_mock.datetime.now.return_value = utc_now
# A directory with no 'renewalInfo' should result in None.
self.client.directory = messages.Directory({})
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 20, 00, 00, 00),
)
t, _ = self.client.renewal_time(cert_pem)
assert t is None
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 30, 00, 00, 00),
)
t, _ = self.client.renewal_time(cert_pem)
assert t is None
@mock.patch('acme.client.datetime')
def test_renewal_time_with_renewal_info(self, dt_mock):
from cryptography import x509
from acme.client import _renewal_info_path_component
utc_now = datetime.datetime(2025, 3, 15, tzinfo=datetime.timezone.utc)
dt_mock.datetime.now.return_value = utc_now
dt_mock.timedelta = datetime.timedelta
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 20, 00, 00, 00),
)
self.client.directory = messages.Directory({
'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info',
})
self.response.json.return_value = {
"suggestedWindow": {
"start": "2025-03-14T01:01:01Z",
"end": "2025-03-14T01:01:01Z",
},
"message": "Keep those certs fresh"
}
t, _ = self.client.renewal_time(cert_pem)
cert_parsed = x509.load_pem_x509_certificate(cert_pem)
ari_path_component = _renewal_info_path_component(cert_parsed)
self.net.get.assert_called_once_with("https://www.letsencrypt-demo.org/acme/renewal-info/" +
ari_path_component,
content_type='application/json')
assert t == datetime.datetime(2025, 3, 14, 1, 1, 1, tzinfo=datetime.timezone.utc)
self.net.reset_mock()
self.response.json.return_value = {
"suggestedWindow": {
"start": "2025-03-16T01:01:01Z",
"end": "2025-03-17T01:01:01Z",
},
"message": "Keep those certs fresh"
}
t, _ = self.client.renewal_time(cert_pem)
self.net.get.assert_called_once_with("https://www.letsencrypt-demo.org/acme/renewal-info/" +
ari_path_component,
content_type='application/json')
assert t >= datetime.datetime(2025, 3, 16, 1, 1, 1, tzinfo=datetime.timezone.utc)
assert t <= datetime.datetime(2025, 3, 17, 1, 1, 1, tzinfo=datetime.timezone.utc)
@mock.patch('acme.client.datetime')
def test_renewal_time_renewal_info_errors(self, dt_mock):
def now(tzinfo=None):
return datetime.datetime(2025, 3, 15, tzinfo=tzinfo)
dt_mock.datetime.now.side_effect = now
dt_mock.timedelta = datetime.timedelta
dt_mock.timezone = datetime.timezone
self.client.directory = messages.Directory({
'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info',
})
# Failure to fetch the 'renewalInfo' URL should raise an ARIError with the exception raised
# by self.net.get as the __cause__ and a Retry-After 6 hours in the future
expected_cause = requests.exceptions.RequestException
expected_retry_after = now() + datetime.timedelta(seconds=6 * 60 * 60)
self.net.get.side_effect = expected_cause
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 20, 00, 00, 00),
)
with pytest.raises(errors.ARIError) as exception_info:
self.client.renewal_time(cert_pem)
assert isinstance(exception_info.value.__cause__, expected_cause)
assert exception_info.value.retry_after == expected_retry_after
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 30, 00, 00, 00),
)
with pytest.raises(errors.ARIError) as exception_info:
self.client.renewal_time(cert_pem)
assert isinstance(exception_info.value.__cause__, expected_cause)
assert exception_info.value.retry_after == expected_retry_after
@mock.patch('acme.client.datetime')
def test_renewal_time_returns_retry_after(self, dt_mock):
def now(tzinfo=None):
return datetime.datetime(2025, 3, 15, tzinfo=tzinfo)
dt_mock.datetime.now.side_effect = now
dt_mock.timedelta = datetime.timedelta
dt_mock.timezone = datetime.timezone
self.client.directory = messages.Directory({
'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info',
})
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 20, 00, 00, 00),
)
self.response.json.return_value = {
"suggestedWindow": {
"start": "2025-03-14T01:01:01Z",
"end": "2025-03-14T01:01:01Z",
},
"message": "Keep those certs fresh"
}
# With no explicit Retry-After in header, default to six hours
_, retry_after = self.client.renewal_time(cert_pem)
assert retry_after == datetime.datetime(2025, 3, 15, 6, 0, 0)
# With an explicit Retry-After in header, use that
self.response.headers['Retry-After'] = '100'
_, retry_after = self.client.renewal_time(cert_pem)
assert retry_after == datetime.datetime(2025, 3, 15, 00, 1, 40)
def test_renewal_info_path_component():
from cryptography import x509
from acme.client import _renewal_info_path_component
cert = x509.load_pem_x509_certificate(test_util.load_vector('rsa2048_cert.pem'))
assert _renewal_info_path_component(cert) == "fL5sRirC8VS5AtOQh9DfoAzYNCI.ALVG_VbBb5U7"
# From https://www.ietf.org/archive/id/draft-ietf-acme-ari-08.html appendix A.
ARI_TEST_CERT = b"""
-----BEGIN CERTIFICATE-----
MIIBQzCB66ADAgECAgUAh2VDITAKBggqhkjOPQQDAjAVMRMwEQYDVQQDEwpFeGFt
cGxlIENBMCIYDzAwMDEwMTAxMDAwMDAwWhgPMDAwMTAxMDEwMDAwMDBaMBYxFDAS
BgNVBAMTC2V4YW1wbGUuY29tMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEeBZu
7cbpAYNXZLbbh8rNIzuOoqOOtmxA1v7cRm//AwyMwWxyHz4zfwmBhcSrf47NUAFf
qzLQ2PPQxdTXREYEnKMjMCEwHwYDVR0jBBgwFoAUaYhba4dGQEHhs3uEe6CuLN4B
yNQwCgYIKoZIzj0EAwIDRwAwRAIge09+S5TZAlw5tgtiVvuERV6cT4mfutXIlwTb
+FYN/8oCIClDsqBklhB9KAelFiYt9+6FDj3z4KGVelYM5MdsO3pK
-----END CERTIFICATE-----
"""
cert = x509.load_pem_x509_certificate(ARI_TEST_CERT)
assert _renewal_info_path_component(cert) == "aYhba4dGQEHhs3uEe6CuLN4ByNQ.AIdlQyE"
if __name__ == '__main__':
sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover
def make_cert_for_renewal(not_before, not_after) -> bytes:
"""
Return a PEM-encoded, self-signed certificate with the given dates.
"""
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization, hashes
# AKID and serial are the inputs to constructing the renewalInfo URL
akid = x509.AuthorityKeyIdentifier(b"1234", None, None)
serial = 56789
key = ec.generate_private_key(ec.SECP256R1())
cert = x509.CertificateBuilder(
issuer_name=x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "Some Issuer")]),
subject_name=x509.Name([]),
public_key=key.public_key(),
serial_number=serial,
not_valid_before=not_before,
not_valid_after=not_after,
).add_extension(
x509.SubjectAlternativeName([x509.DNSName('example.com')]),
critical=False,
).add_extension(
akid,
critical=False,
).sign(
private_key=key,
algorithm=hashes.SHA256(),
)
return cert.public_bytes(serialization.Encoding.PEM)
class MockJSONDeSerializable(jose.JSONDeSerializable):
# pylint: disable=missing-docstring
def __init__(self, value):
self.value = value
def to_partial_json(self):
return {'foo': self.value}
@classmethod
def from_json(cls, jobj):
pass # pragma: no cover
class ClientNetworkTest(unittest.TestCase):
"""Tests for acme.client.ClientNetwork."""
def setUp(self):
self.verify_ssl = mock.MagicMock()
self.wrap_in_jws = mock.MagicMock(return_value=mock.sentinel.wrapped)
self.net = ClientNetwork(
key=KEY, alg=jose.RS256, verify_ssl=self.verify_ssl,
user_agent='acme-python-test')
self.response = mock.MagicMock(ok=True, status_code=http_client.OK)
self.response.headers = {}
self.response.links = {}
def test_init(self):
assert self.net.verify_ssl is self.verify_ssl
def test_wrap_in_jws(self):
# pylint: disable=protected-access
jws_dump = self.net._wrap_in_jws(
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url")
jws = acme_jws.JWS.json_loads(jws_dump)
assert json.loads(jws.payload.decode()) == {'foo': 'foo'}
assert jws.signature.combined.nonce == b'Tg'
def test_wrap_in_jws_v2(self):
self.net.account = {'uri': 'acct-uri'}
# pylint: disable=protected-access
jws_dump = self.net._wrap_in_jws(
MockJSONDeSerializable('foo'), nonce=b'Tg', url="url")
jws = acme_jws.JWS.json_loads(jws_dump)
assert json.loads(jws.payload.decode()) == {'foo': 'foo'}
assert jws.signature.combined.nonce == b'Tg'
assert jws.signature.combined.kid == u'acct-uri'
assert jws.signature.combined.url == u'url'
def test_check_response_not_ok_jobj_no_error(self):
self.response.ok = False
self.response.json.return_value = {}
with mock.patch('acme.client.messages.Error.from_json') as from_json:
from_json.side_effect = jose.DeserializationError
# pylint: disable=protected-access
with pytest.raises(errors.ClientError):
self.net._check_response(self.response)
def test_check_response_not_ok_jobj_error(self):
self.response.ok = False
self.response.json.return_value = messages.Error.with_code(
'serverInternal', detail='foo', title='some title').to_json()
# pylint: disable=protected-access
with pytest.raises(messages.Error):
self.net._check_response(self.response)
def test_check_response_not_ok_no_jobj(self):
self.response.ok = False
self.response.json.side_effect = ValueError
# pylint: disable=protected-access
with pytest.raises(errors.ClientError):
self.net._check_response(self.response)
def test_check_response_ok_no_jobj_ct_required(self):
self.response.json.side_effect = ValueError
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
with pytest.raises(errors.ClientError):
self.net._check_response(self.response,
content_type=self.net.JSON_CONTENT_TYPE)
def test_check_response_ok_no_jobj_no_ct(self):
self.response.json.side_effect = ValueError
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
assert self.response == self.net._check_response(self.response)
@mock.patch('acme.client.logger')
def test_check_response_ok_ct_with_charset(self, mock_logger):
self.response.json.return_value = {}
self.response.headers['Content-Type'] = 'application/json; charset=utf-8'
# pylint: disable=protected-access
assert self.response == self.net._check_response(
self.response, content_type='application/json')
try:
mock_logger.debug.assert_called_with(
'Ignoring wrong Content-Type (%r) for JSON decodable response',
'application/json; charset=utf-8'
)
except AssertionError:
return
raise AssertionError('Expected Content-Type warning ' #pragma: no cover
'to not have been logged')
@mock.patch('acme.client.logger')
def test_check_response_ok_bad_ct(self, mock_logger):
self.response.json.return_value = {}
self.response.headers['Content-Type'] = 'text/plain'
# pylint: disable=protected-access
assert self.response == self.net._check_response(
self.response, content_type='application/json')
mock_logger.debug.assert_called_with(
'Ignoring wrong Content-Type (%r) for JSON decodable response',
'text/plain'
)
def test_check_response_conflict(self):
self.response.ok = False
self.response.status_code = 409
# pylint: disable=protected-access
with pytest.raises(errors.ConflictError):
self.net._check_response(self.response)
def test_check_response_jobj(self):
self.response.json.return_value = {}
for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']:
self.response.headers['Content-Type'] = response_ct
# pylint: disable=protected-access
assert self.response == self.net._check_response(self.response)
def test_send_request(self):
self.net.session = mock.MagicMock()
self.net.session.request.return_value = self.response
# pylint: disable=protected-access
assert self.response == self.net._send_request(
'HEAD', 'http://example.com/', 'foo', bar='baz')
self.net.session.request.assert_called_once_with(
'HEAD', 'http://example.com/', 'foo',
headers=mock.ANY, verify=mock.ANY, timeout=mock.ANY, bar='baz')
@mock.patch('acme.client.logger')
def test_send_request_get_der(self, mock_logger):
self.net.session = mock.MagicMock()
self.net.session.request.return_value = mock.MagicMock(
ok=True, status_code=http_client.OK,
content=b"hi")
# pylint: disable=protected-access
self.net._send_request('HEAD', 'http://example.com/', 'foo',
timeout=mock.ANY, bar='baz', headers={'Accept': 'application/pkix-cert'})
mock_logger.debug.assert_called_with(
'Received response:\nHTTP %d\n%s\n\n%s', 200,
'', b'aGk=')
def test_send_request_post(self):
self.net.session = mock.MagicMock()
self.net.session.request.return_value = self.response
# pylint: disable=protected-access
assert self.response == self.net._send_request(
'POST', 'http://example.com/', 'foo', data='qux', bar='baz')
self.net.session.request.assert_called_once_with(
'POST', 'http://example.com/', 'foo',
headers=mock.ANY, verify=mock.ANY, timeout=mock.ANY, data='qux', bar='baz')
def test_send_request_verify_ssl(self):
# pylint: disable=protected-access
for verify in True, False:
self.net.session = mock.MagicMock()
self.net.session.request.return_value = self.response
self.net.verify_ssl = verify
# pylint: disable=protected-access
assert self.response == \
self.net._send_request('GET', 'http://example.com/')
self.net.session.request.assert_called_once_with(
'GET', 'http://example.com/', verify=verify,
timeout=mock.ANY, headers=mock.ANY)
def test_send_request_user_agent(self):
self.net.session = mock.MagicMock()
# pylint: disable=protected-access
self.net._send_request('GET', 'http://example.com/',
headers={'bar': 'baz'})
self.net.session.request.assert_called_once_with(
'GET', 'http://example.com/', verify=mock.ANY,
timeout=mock.ANY,
headers={'User-Agent': 'acme-python-test', 'bar': 'baz'})
self.net._send_request('GET', 'http://example.com/',
headers={'User-Agent': 'foo2'})
self.net.session.request.assert_called_with(
'GET', 'http://example.com/',
verify=mock.ANY, timeout=mock.ANY, headers={'User-Agent': 'foo2'})
def test_send_request_timeout(self):
self.net.session = mock.MagicMock()
# pylint: disable=protected-access
self.net._send_request('GET', 'http://example.com/',
headers={'bar': 'baz'})
self.net.session.request.assert_called_once_with(
mock.ANY, mock.ANY, verify=mock.ANY, headers=mock.ANY,
timeout=45)
def test_del(self, close_exception=None):
sess = mock.MagicMock()
if close_exception is not None:
sess.close.side_effect = close_exception
self.net.session = sess
del self.net
sess.close.assert_called_once_with()
def test_del_error(self):
self.test_del(ReferenceError)
@mock.patch('acme.client.requests')
def test_requests_error_passthrough(self, mock_requests):
mock_requests.exceptions = requests.exceptions
mock_requests.request.side_effect = requests.exceptions.RequestException
# pylint: disable=protected-access
with pytest.raises(requests.exceptions.RequestException):
self.net._send_request('GET', 'uri')
class ClientNetworkWithMockedResponseTest(unittest.TestCase):
"""Tests for acme.client.ClientNetwork which mock out response."""
def setUp(self):
self.net = ClientNetwork(key='fake', alg=None)
self.response = mock.MagicMock(ok=True, status_code=http_client.OK)
self.response.headers = {}
self.response.links = {}
self.response.checked = False
self.acmev1_nonce_response = mock.MagicMock(
ok=False, status_code=http_client.METHOD_NOT_ALLOWED)
self.acmev1_nonce_response.headers = {}
self.obj = mock.MagicMock()
self.wrapped_obj = mock.MagicMock()
self.content_type = mock.sentinel.content_type
self.all_nonces = [
jose.b64encode(b'Nonce'),
jose.b64encode(b'Nonce2'), jose.b64encode(b'Nonce3')]
self.available_nonces = self.all_nonces[:]
def send_request(*args, **kwargs):
# pylint: disable=unused-argument,missing-docstring
assert "new_nonce_url" not in kwargs
method = args[0]
uri = args[1]
if method == 'HEAD' and uri != "new_nonce_uri":
response = self.acmev1_nonce_response
else:
response = self.response
if self.available_nonces:
response.headers = {
self.net.REPLAY_NONCE_HEADER:
self.available_nonces.pop().decode()}
else:
response.headers = {}
return response
# pylint: disable=protected-access
self.net._send_request = self.send_request = mock.MagicMock(
side_effect=send_request)
self.net._check_response = self.check_response
self.net._wrap_in_jws = mock.MagicMock(return_value=self.wrapped_obj)
def check_response(self, response, content_type):
# pylint: disable=missing-docstring
assert self.response == response
assert self.content_type == content_type
assert self.response.ok
self.response.checked = True
return self.response
def test_head(self):
assert self.acmev1_nonce_response == self.net.head(
'http://example.com/', 'foo', bar='baz')
self.send_request.assert_called_once_with(
'HEAD', 'http://example.com/', 'foo', bar='baz')
def test_head_v2(self):
assert self.response == self.net.head(
'new_nonce_uri', 'foo', bar='baz')
self.send_request.assert_called_once_with(
'HEAD', 'new_nonce_uri', 'foo', bar='baz')
def test_get(self):
assert self.response == self.net.get(
'http://example.com/', content_type=self.content_type, bar='baz')
assert self.response.checked
self.send_request.assert_called_once_with(
'GET', 'http://example.com/', bar='baz')
def test_post_no_content_type(self):
self.content_type = self.net.JOSE_CONTENT_TYPE
assert self.response == self.net.post('uri', self.obj)
assert self.response.checked
def test_post(self):
assert self.response == self.net.post(
'uri', self.obj, content_type=self.content_type)
assert self.response.checked
self.net._wrap_in_jws.assert_called_once_with(
self.obj, jose.b64decode(self.all_nonces.pop()), "uri")
self.available_nonces = []
with pytest.raises(errors.MissingNonce):
self.net.post('uri', self.obj, content_type=self.content_type)
self.net._wrap_in_jws.assert_called_with(
self.obj, jose.b64decode(self.all_nonces.pop()), "uri")
def test_post_wrong_initial_nonce(self): # HEAD
self.available_nonces = [b'f', jose.b64encode(b'good')]
with pytest.raises(errors.BadNonce):
self.net.post('uri',
self.obj, content_type=self.content_type)
def test_post_wrong_post_response_nonce(self):
self.available_nonces = [jose.b64encode(b'good'), b'f']
with pytest.raises(errors.BadNonce):
self.net.post('uri',
self.obj, content_type=self.content_type)
def test_post_failed_retry(self):
check_response = mock.MagicMock()
check_response.side_effect = messages.Error.with_code('badNonce')
self.net._check_response = check_response
with pytest.raises(messages.Error):
self.net.post('uri',
self.obj, content_type=self.content_type)
def test_post_not_retried(self):
check_response = mock.MagicMock()
check_response.side_effect = [messages.Error.with_code('malformed'),
self.response]
self.net._check_response = check_response
with pytest.raises(messages.Error):
self.net.post('uri',
self.obj, content_type=self.content_type)
def test_post_successful_retry(self):
post_once = mock.MagicMock()
post_once.side_effect = [messages.Error.with_code('badNonce'),
self.response]
assert self.response == self.net.post(
'uri', self.obj, content_type=self.content_type)
def test_head_get_post_error_passthrough(self):
self.send_request.side_effect = requests.exceptions.RequestException
for method in self.net.head, self.net.get:
with pytest.raises(requests.exceptions.RequestException):
method('GET', 'uri')
with pytest.raises(requests.exceptions.RequestException):
self.net.post('uri', obj=self.obj)
def test_post_bad_nonce_head(self):
# pylint: disable=protected-access
# regression test for https://github.com/certbot/certbot/issues/6092
bad_response = mock.MagicMock(ok=False, status_code=http_client.SERVICE_UNAVAILABLE)
self.net._send_request = mock.MagicMock()
self.net._send_request.return_value = bad_response
self.content_type = None
check_response = mock.MagicMock()
self.net._check_response = check_response
with pytest.raises(errors.ClientError):
self.net.post('uri',
self.obj, content_type=self.content_type,
new_nonce_url='new_nonce_uri')
assert check_response.call_count == 1
def test_new_nonce_uri_removed(self):
self.content_type = None
self.net.post('uri', self.obj, content_type=None, new_nonce_url='new_nonce_uri')
def test_no_key_error(self):
"A ClientNetwork with no key should error on POST but succeed on GET"
self.net = ClientNetwork()
self.net._send_request = mock.MagicMock()
self.net._send_request.return_value = self.response
with pytest.raises(errors.Error):
self.net.post('uri', "body")
assert self.response == self.net.get(
'uri', content_type=self.content_type, bar='baz')
if __name__ == '__main__':
sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover