Source code for django_ca.tasks

# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Asynchronous Celery tasks for django-ca.

.. seealso:: https://docs.celeryproject.org/en/stable/index.html
"""

import logging
import typing
from collections.abc import Callable, Iterable
from datetime import datetime, timedelta, timezone as tz
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, cast

import requests

from cryptography import x509
from cryptography.x509.oid import ExtensionOID

from django.db import transaction
from django.utils import timezone

from django_ca.acme.validation import validate_dns_01
from django_ca.conf import model_settings
from django_ca.constants import EXTENSION_DEFAULT_CRITICAL
from django_ca.models import (
    AcmeAuthorization,
    AcmeCertificate,
    AcmeChallenge,
    AcmeOrder,
    Certificate,
    CertificateAuthority,
    CertificateOrder,
)
from django_ca.profiles import profiles
from django_ca.pydantic.messages import GenerateOCSPKeyMessage, SignCertificateMessage
from django_ca.typehints import (
    JSON,
    SerializedPydanticExtension,
    SerializedPydanticName,
    SignatureHashAlgorithmName,
)
from django_ca.utils import parse_general_name

log = logging.getLogger(__name__)


if TYPE_CHECKING:
    # This module comes from our stubs
    from celery.typehints import TaskParamSpec, TaskReturnSpec

try:
    from celery import shared_task
    from celery.local import Proxy
except ImportError:
    if TYPE_CHECKING:
        from celery.local import Proxy

    def shared_task(
        func: "Callable[TaskParamSpec, TaskReturnSpec]",
    ) -> "Proxy[TaskParamSpec, TaskReturnSpec]":
        """Dummy decorator so that we can use the decorator whether celery is installed or not."""
        # We do not yet need this, but might come in handy in the future:
        # func.delay = lambda *a, **kw: func(*a, **kw)
        # func.apply_async = lambda *a, **kw: func(*a, **kw)
        func.delay = func  # type: ignore[attr-defined]
        return cast("Proxy[TaskParamSpec, TaskReturnSpec]", func)


# pragma: only py<3.10: Use typing.ParamSpec for better type hinting
def run_task(
    task: "Proxy[TaskParamSpec, TaskReturnSpec]",
    *args: "TaskParamSpec.args",
    **kwargs: "TaskParamSpec.kwargs",
) -> Any:
    """Function that passes `task` to celery or invokes it directly, depending on if Celery is installed."""
    eager = kwargs.pop("eager", False)

    if model_settings.CA_USE_CELERY is True and eager is False:
        return task.delay(*args, **kwargs)

    return task(*args, **kwargs)


[docs] @shared_task def cache_crl(serial: str, key_backend_options: dict[str, JSON] | None = None) -> None: """Task to cache the CRL for a given CA.""" if key_backend_options is None: key_backend_options = {} ca = CertificateAuthority.objects.get(serial=serial) key_backend_options_model = ca.key_backend.use_model.model_validate( key_backend_options, context={"ca": ca, "backend": ca.key_backend}, strict=True ) ca.cache_crls(key_backend_options_model)
[docs] @shared_task def cache_crls( serials: Iterable[str] | None = None, key_backend_options: dict[str, dict[str, JSON]] | None = None ) -> None: """Task to cache the CRLs for all CAs.""" if serials is None: serials = [] if key_backend_options is None: key_backend_options = {} if not serials: serials = typing.cast( Iterable[str], CertificateAuthority.objects.usable().values_list("serial", flat=True) ) for serial in serials: try: run_task(cache_crl, serial, key_backend_options=key_backend_options.get(serial, {})) except Exception: # pylint: disable=broad-exception-caught # NOTE: When using Celery, an exception will only be raised here if task.delay() itself raises an # exception, e.g. if the connection to the broker fails. Without celery, exceptions in cache_crl() # are raised here directly. log.exception("Error caching CRL for %s", serial)
[docs] @shared_task def generate_ocsp_key( serial: str, key_backend_options: dict[str, JSON] | None = None, force: bool = False ) -> int | None: """Task to generate an OCSP key for the CA named by `serial`. The `serial` names the certificate authority for which to regenerate the OCSP responder certificate. All other arguments are passed on to :py:func:`~django_ca.models.CertificateAuthority.generate_ocsp_key`. The task returns the primary key of the generated certificate if it was generated, or ``None`` otherwise. """ if key_backend_options is None: key_backend_options = {} parameters = GenerateOCSPKeyMessage(serial=serial, force=force) ca: CertificateAuthority = CertificateAuthority.objects.get(serial=parameters.serial) key_backend_options_model = ca.key_backend.use_model.model_validate( key_backend_options, context={"ca": ca, "backend": ca.key_backend}, strict=True ) cert = ca.generate_ocsp_key(key_backend_options=key_backend_options_model, force=parameters.force) if cert is not None: return cert.pk return None
[docs] @shared_task def generate_ocsp_keys( serials: Iterable[str] | None = None, key_backend_options: dict[str, dict[str, JSON]] | None = None ) -> None: """Task to generate an OCSP keys for all usable CAs.""" if serials is None: serials = [] if key_backend_options is None: key_backend_options = {} if not serials: serials = typing.cast( Iterable[str], CertificateAuthority.objects.usable().values_list("serial", flat=True) ) for serial in serials: try: run_task(generate_ocsp_key, serial, key_backend_options=key_backend_options.get(serial, {})) except Exception: # pylint: disable=broad-exception-caught # NOTE: When using Celery, an exception will only be raised here if task.delay() itself raises an # exception, e.g. if the connection to the broker fails. Without celery, exceptions in # generate_ocsp_key() are raised here directly. log.exception("Error creating OCSP responder key for %s", serial)
[docs] @shared_task @transaction.atomic def api_sign_certificate( order_pk: int, csr: str, subject: SerializedPydanticName, algorithm: SignatureHashAlgorithmName | None = None, not_after: str | None = None, extensions: list[SerializedPydanticExtension] | None = None, profile: str = model_settings.CA_DEFAULT_PROFILE, autogenerated: bool = False, key_backend_options: dict[str, JSON] | None = None, ) -> int | None: """Sign a certificate from the given order with the given parameters.""" if key_backend_options is None: key_backend_options = {} order = CertificateOrder.objects.select_related("certificate_authority").get(pk=order_pk) ca: CertificateAuthority = order.certificate_authority message = SignCertificateMessage( key_backend_options=key_backend_options, algorithm=algorithm, autogenerated=autogenerated, csr=csr, not_after=not_after, extensions=extensions, profile=profile, subject=subject, ) key_backend_options_model = ca.key_backend.get_use_private_key_options(ca, key_backend_options) parsed_extensions = message.get_extensions() extension_oids = [ext.oid for ext in parsed_extensions] for oid, extension in ca.extensions_for_certificate.items(): if oid not in extension_oids: parsed_extensions.append(extension) parsed_csr = message.get_csr() # Create a signed certificate try: certificate = ca.sign( key_backend_options_model, parsed_csr, subject=message.subject.cryptography, # pylint: disable=no-member # false positive algorithm=message.get_algorithm(), not_after=message.not_after, extensions=parsed_extensions, ) except Exception: # pylint: disable=broad-exception-caught # really want to catch everything log.exception("Could not sign certificate") order.status = CertificateOrder.STATUS_FAILED order.error_code = 1 order.error = "Could not sign certificate." order.save() return None # Store certificate in database certificate_obj = Certificate( ca=ca, csr=parsed_csr, profile=message.profile, autogenerated=message.autogenerated ) certificate_obj.update_certificate(certificate) certificate_obj.save() # Update certificate order order.status = CertificateOrder.STATUS_ISSUED order.certificate = certificate_obj order.save() return certificate_obj.pk
[docs] @shared_task @transaction.atomic def acme_validate_challenge(challenge_pk: int) -> None: """Validate an ACME challenge.""" if not model_settings.CA_ENABLE_ACME: log.error("ACME is not enabled.") return try: challenge = AcmeChallenge.objects.url().get(pk=challenge_pk) except AcmeChallenge.DoesNotExist: log.error("Challenge with id=%s not found", challenge_pk) return # Whoever is invoking this task is responsible for setting the status to "processing" first. if challenge.status != AcmeChallenge.STATUS_PROCESSING: log.error( "%s: %s: Invalid state (must be %s)", challenge, challenge.status, AcmeChallenge.STATUS_PROCESSING ) return # If the auth cannot be used for validation, neither can this challenge. We check auth.usable instead of # challenge.usable b/c a challenge in the "processing" state is not "usable" (= it is already being used). if challenge.auth.usable is False: log.error("%s: Authentication is not usable", challenge) return # General data for challenge validation value = challenge.auth.value # Challenge is marked as invalid by default challenge_valid = False # Validate HTTP challenge (only thing supported so far) if challenge.type == AcmeChallenge.TYPE_HTTP_01: decoded_token = challenge.encoded_token.decode("utf-8") expected = challenge.expected if requests is None: # pragma: no cover log.error("requests is not installed, cannot do http-01 challenge validation.") return url = f"http://{value}/.well-known/acme-challenge/{decoded_token}" try: with requests.get(url, timeout=1, stream=True) as response: # Only fetch the response body if the status code is HTTP 200 (OK) if response.status_code == HTTPStatus.OK: # Only fetch the expected number of bytes to prevent a large file ending up in memory # But fetch one extra byte (if available) to make sure that response has no extra bytes received = response.raw.read(len(expected) + 1, decode_content=True) challenge_valid = received == expected except Exception as ex: # pylint: disable=broad-except log.exception(ex) elif challenge.type == AcmeChallenge.TYPE_DNS_01: challenge_valid = validate_dns_01(challenge) else: # pragma: no cover log.error("%s: Challenge type is not supported.", challenge) # Transition state of the challenge depending on if the challenge is valid or not. RFC8555, Section 7.1.6: # # "If validation is successful, the challenge moves to the "valid" state; if there is an error, the # challenge moves to the "invalid" state." # # We also transition the matching authorization object: # # "If one of the challenges listed in the authorization transitions to the "valid" state, then the # authorization also changes to the "valid" state. If the client attempts to fulfill a challenge and # fails, or if there is an error while the authorization is still pending, then the authorization # transitions to the "invalid" state. # # We also transition the matching order object (section 7.4): # # "* ready: The server agrees that the requirements have been fulfilled, and is awaiting finalization. # Submit a finalization request." if challenge_valid: challenge.status = AcmeChallenge.STATUS_VALID challenge.validated = timezone.now() challenge.auth.status = AcmeAuthorization.STATUS_VALID # Set the order status to READY if all challenges are valid auths = AcmeAuthorization.objects.filter(order=challenge.auth.order) auths = auths.exclude(status=AcmeAuthorization.STATUS_VALID) if not auths.exclude(pk=challenge.auth.pk).exists(): log.info("Order is now valid") challenge.auth.order.status = AcmeOrder.STATUS_READY else: challenge.status = AcmeChallenge.STATUS_INVALID # RFC 8555, section 7.1.6: # # If the client attempts to fulfill a challenge and fails, or if there is an error while the # authorization is still pending, then the authorization transitions to the "invalid" state. challenge.auth.status = AcmeAuthorization.STATUS_INVALID # RFC 8555, section 7.1.6: # # If an error occurs at any of these stages, the order moves to the "invalid" state. challenge.auth.order.status = AcmeOrder.STATUS_INVALID log.info("%s is %s", challenge, challenge.status) challenge.save() challenge.auth.save() challenge.auth.order.save()
[docs] @shared_task @transaction.atomic def acme_issue_certificate(acme_certificate_pk: int) -> None: """Actually issue an ACME certificate.""" if not model_settings.CA_ENABLE_ACME: log.error("ACME is not enabled.") return try: acme_cert = AcmeCertificate.objects.select_related("order__account__ca").get(pk=acme_certificate_pk) except AcmeCertificate.DoesNotExist: log.error("Certificate with id=%s not found", acme_certificate_pk) return if acme_cert.usable is False: log.error("%s: Cannot issue certificate for this order", acme_cert.order) return names = [a.subject_alternative_name for a in acme_cert.order.authorizations.all()] log.info("%s: Issuing certificate for %s", acme_cert.order, ",".join(names)) subject_alternative_names = x509.SubjectAlternativeName([parse_general_name(name) for name in names]) extensions = [ x509.Extension( oid=ExtensionOID.SUBJECT_ALTERNATIVE_NAME, critical=EXTENSION_DEFAULT_CRITICAL[ExtensionOID.SUBJECT_ALTERNATIVE_NAME], value=subject_alternative_names, ) ] ca = acme_cert.order.account.ca profile = profiles[ca.acme_profile] # Honor not_after from the order if set if acme_cert.order.not_after: not_after = acme_cert.order.not_after # Make sure not_after is tz-aware, even if USE_TZ=False. if timezone.is_naive(not_after): not_after = timezone.make_aware(not_after) else: not_after = datetime.now(tz=tz.utc) + model_settings.CA_ACME_DEFAULT_CERT_VALIDITY csr = acme_cert.parse_csr() # Initialize key backend options key_backend_options = ca.key_backend.get_use_private_key_options(ca, {}) # Finally, actually create a certificate cert = Certificate.objects.create_cert( ca, key_backend_options, csr=csr, profile=profile, not_after=not_after, extensions=extensions ) acme_cert.cert = cert acme_cert.order.status = AcmeOrder.STATUS_VALID acme_cert.order.save() acme_cert.save()
[docs] @shared_task @transaction.atomic def acme_cleanup() -> None: """Cleanup expired ACME orders.""" if not model_settings.CA_ENABLE_ACME: # NOTE: Since this task does only cleanup, log message is only info. log.info("ACME is not enabled, not doing anything.") return # Delete orders that expired more than a day ago. threshold = timezone.now() - timedelta(days=1) AcmeOrder.objects.filter(expires__lt=threshold).delete()