Skip to content
99 changes: 99 additions & 0 deletions truenas_acme_utils/ari.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import json
import time
from datetime import datetime

import requests


# ARI retry configuration per RFC 9773 Section 4.3.3
DEFAULT_RETRY_AFTER = 21600 # 6 hours default
MIN_RETRY_AFTER = 60 # 1 minute minimum
MAX_RETRY_AFTER = 86400 # 1 day maximum
MAX_RETRIES = 3 # Max temporary error retries


def fetch_renewal_info(ari_endpoint: str, cert_id: str, retries: int = MAX_RETRIES, timeout: int = 30) -> dict:
"""
Fetch renewal information from ACME server per RFC 9773 Section 4

Implements exponential backoff for temporary errors per RFC 9773 Section 4.3.3

:param ari_endpoint: RenewalInfo endpoint URL
:param cert_id: Unique certificate identifier from get_cert_id()
:param retries: Number of retries remaining for temporary errors
:param timeout: Request timeout in seconds
:return: Dict with error field (None if success), suggestedWindow (start/end datetimes),
optional explanationURL, retry_after
"""
url = f'{ari_endpoint.rstrip("/")}/{cert_id}'
backoff_delay = 1
response = None

for attempt in range(retries + 1):
try:
response = requests.get(url, timeout=timeout)

# Check for HTTP 409 alreadyReplaced error (RFC 9773 Section 7.4)
if response.status_code == 409:
return {'error': 'Certificate has already been marked as replaced'}

# Handle 5xx server errors as temporary (RFC 9773 Section 4.3.3)
if 500 <= response.status_code < 600:
if attempt < retries:
time.sleep(backoff_delay)
backoff_delay *= 2
continue
return {'error': f'ARI server error after {retries + 1} attempts: HTTP {response.status_code}'}

if response.status_code not in (200, 201, 204):
return {'error': f'ARI request failed: HTTP {response.status_code}'}

data = response.json()
break

except (ConnectionError, TimeoutError, requests.exceptions.RequestException) as e:
if attempt < retries:
time.sleep(backoff_delay)
backoff_delay *= 2
continue

return {'error': f'ARI request failed after {retries + 1} attempts: {e}'}
except json.JSONDecodeError as e:
return {'error': f'Invalid JSON response: {e}'}
except Exception as e:
return {'error': f'ARI request failed: {e}'}

if 'suggestedWindow' not in data:
return {'error': 'Invalid ARI response: missing suggestedWindow'}

window = data['suggestedWindow']
if 'start' not in window or 'end' not in window:
return {'error': 'Invalid suggestedWindow: missing start or end'}

try:
start = datetime.fromisoformat(window['start'].replace('Z', '+00:00'))
end = datetime.fromisoformat(window['end'].replace('Z', '+00:00'))
except (ValueError, TypeError) as e:
return {'error': f'Invalid date format in suggestedWindow: {e}'}

result = {
'error': None,
'suggested_window': {'start': start, 'end': end},
'retry_after': None,
'explanation_url': data.get('explanationURL'),
}

# Parse Retry-After header per RFC 9773 Section 4.3
if response and 'Retry-After' in response.headers:
try:
retry_after = int(response.headers['Retry-After'])
# Clamp to reasonable limits per RFC 9773 Section 4.3.2
retry_after = max(MIN_RETRY_AFTER, min(retry_after, MAX_RETRY_AFTER))
result['retry_after'] = retry_after
except ValueError:
result['retry_after'] = DEFAULT_RETRY_AFTER
else:
# Use default if not provided per RFC 9773 Section 4.3.3
result['retry_after'] = DEFAULT_RETRY_AFTER

return result
54 changes: 52 additions & 2 deletions truenas_acme_utils/client_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
import typing

import josepy as jose
from acme import client, messages
from acme import client, crypto_util, messages
from cryptography import x509


class NewOrder(messages.NewOrder):
replaces: str = jose.field('replaces', omitempty=True)


class BodyDict(typing.TypedDict):
Expand All @@ -17,6 +22,7 @@ class ACMEClientAndKeyData(typing.TypedDict):
new_nonce_uri: str
new_order_uri: str
revoke_cert_uri: str
renewal_info: str | None
body: BodyDict


Expand All @@ -29,6 +35,7 @@ def get_acme_client_and_key(data: ACMEClientAndKeyData) -> tuple[client.ClientV2
- new_nonce_uri: str
- new_order_uri: str
- revoke_cert_uri: str
- renewal_info: str (optional)
- body: dict
- status: str
- key: dict
Expand Down Expand Up @@ -58,7 +65,50 @@ def get_acme_client_and_key(data: ACMEClientAndKeyData) -> tuple[client.ClientV2
'newAccount': data['new_account_uri'],
'newNonce': data['new_nonce_uri'],
'newOrder': data['new_order_uri'],
'revokeCert': data['revoke_cert_uri']
'revokeCert': data['revoke_cert_uri'],
**({'renewalInfo': data['renewal_info']} if data.get('renewal_info') else {}),
}),
client.ClientNetwork(key, account=registration)
), key


def acme_order(
acme_client: client.ClientV2, csr_pem: bytes, replaces_cert_id: str | None = None,
) -> messages.OrderResource:
csr = x509.load_pem_x509_csr(csr_pem)
dnsNames = crypto_util.get_names_from_subject_and_extensions(csr.subject, csr.extensions)
try:
san_ext = csr.extensions.get_extension_for_class(x509.SubjectAlternativeName)
except x509.ExtensionNotFound:
ipNames = []
else:
ipNames = san_ext.value.get_values_for_type(x509.IPAddress)

identifiers = []
for name in dnsNames:
identifiers.append(messages.Identifier(typ=messages.IDENTIFIER_FQDN, value=name))

for ip in ipNames:
identifiers.append(messages.Identifier(typ=messages.IDENTIFIER_IP, value=str(ip)))

payload = {'identifiers': identifiers}
if replaces_cert_id:
payload['replaces'] = replaces_cert_id

order = NewOrder(**payload)
response = acme_client._post(acme_client.directory['newOrder'], order)
body = messages.Order.from_json(response.json())

authorizations = []
# pylint has trouble understanding our josepy based objects which use
# things like custom metaclass logic. body.authorizations should be a
# list of strings containing URLs so let's disable this check here.
for url in body.authorizations: # pylint: disable=not-an-iterable
authorizations.append(acme_client._authzr_from_response(acme_client._post_as_get(url), uri=url))

return messages.OrderResource(
body=body,
uri=response.headers.get('Location'),
authorizations=authorizations,
csr_pem=csr_pem,
)
8 changes: 5 additions & 3 deletions truenas_acme_utils/issue_cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import josepy as jose
from acme import errors, messages

from .client_utils import ACMEClientAndKeyData, get_acme_client_and_key
from .client_utils import ACMEClientAndKeyData, acme_order, get_acme_client_and_key
from .event import send_event
from .exceptions import CallError

Expand All @@ -15,13 +15,15 @@


def issue_certificate(
acme_client_key_payload: ACMEClientAndKeyData, csr: str, authenticator_mapping_copy: dict, progress_base: int = 25
acme_client_key_payload: ACMEClientAndKeyData, csr: str, authenticator_mapping_copy: dict, progress_base: int = 25,
cert_renewal_id: str | None = None,
) -> messages.OrderResource:
# cert_id is the ID of the certificate being replaced if any
# Authenticator mapping should be a valid mapping of domain to authenticator object
acme_client, key = get_acme_client_and_key(acme_client_key_payload)
try:
# perform operations and have a cert issued
order = acme_client.new_order(csr.encode())
order = acme_order(acme_client, csr.encode(), cert_renewal_id)
except messages.Error as e:
raise CallError(f'Failed to issue a new order for Certificate : {e}')
else:
Expand Down
34 changes: 34 additions & 0 deletions truenas_crypto_utils/read.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import datetime
import dateutil
import dateutil.parser
Expand All @@ -6,9 +7,11 @@

from contextlib import suppress
from typing import TypeAlias
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, ed25519, ed448, rsa
from cryptography.x509.oid import ExtensionOID
from OpenSSL import crypto

from .utils import RE_CERTIFICATE
Expand All @@ -20,6 +23,37 @@
logger = logging.getLogger(__name__)


def _b64url(b: bytes) -> str:
return base64.urlsafe_b64encode(b).decode().rstrip("=")


def _serial_value_bytes(n: int) -> bytes:
# DER INTEGER value bytes for non-negative n
if n == 0:
return b'\x00'
v = n.to_bytes((n.bit_length() + 7) // 8, 'big')
return v if not (v[0] & 0x80) else b'\x00' + v


def get_cert_id(cert_str: str) -> str:
"""
ARI cert_id per RFC 9773 §4.1
format: base64url(AKI.keyIdentifier) + "." + base64url(serial INTEGER value bytes)
"""
cert = x509.load_pem_x509_certificate(cert_str.encode(), default_backend())
try:
aki_ext = cert.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_KEY_IDENTIFIER)
except x509.ExtensionNotFound as e:
raise ValueError('Certificate missing Authority Key Identifier (AKI)') from e

if aki_ext.value.key_identifier is None:
raise ValueError('AKI keyIdentifier is None')

aki_b64 = _b64url(aki_ext.value.key_identifier)
serial_b64 = _b64url(_serial_value_bytes(cert.serial_number))
return f'{aki_b64}.{serial_b64}'


def parse_cert_date_string(date_value: bytes | str) -> str:
t1 = dateutil.parser.parse(date_value)
t2 = t1.astimezone(dateutil.tz.tzlocal())
Expand Down