# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.
"""Asynchronous Celery tasks for django-ca.
.. seealso:: https://docs.celeryproject.org/en/stable/index.html
"""
import logging
import warnings
from datetime import UTC, datetime, timedelta
from http import HTTPStatus
import requests
from cryptography import x509
from cryptography.x509.oid import ExtensionOID
from django.conf import settings
from django.core.mail import send_mail
from django.db import transaction
from django.utils import timezone
from django_ca.acme.validation import validate_dns_01
from django_ca.celery import DjangoCaTask, run_task, shared_task
from django_ca.celery.messages import (
ApiSignCertificateTaskArgs,
UseCertificateAuthoritiesTaskArgs,
UseCertificateAuthorityTaskArgs,
)
from django_ca.conf import model_settings
from django_ca.constants import EXTENSION_DEFAULT_CRITICAL
from django_ca.deprecation import RemovedInDjangoCA320Warning
from django_ca.models import (
AcmeAuthorization,
AcmeCertificate,
AcmeChallenge,
AcmeOrder,
Certificate,
CertificateAuthority,
CertificateExpiryNotification,
CertificateOrder,
)
from django_ca.profiles import profiles
from django_ca.utils import parse_general_name
log = logging.getLogger(__name__)
[docs]
@shared_task(base=DjangoCaTask)
def cache_crl(data: UseCertificateAuthorityTaskArgs) -> None:
"""**Deprecated.** Use :py:func:`django_ca.tasks.generate_crl` instead.
This task will be removed in ``django-ca~=3.1.0``.
"""
warning = "This task is deprecated, call `django_ca.tasks.generate_crl`."
warnings.warn(warning, RemovedInDjangoCA320Warning, stacklevel=1)
generate_crl(data)
[docs]
@shared_task(base=DjangoCaTask)
def cache_crls(data: UseCertificateAuthoritiesTaskArgs | None = None) -> None:
"""**Deprecated.** Use :py:func:`django_ca.tasks.generate_crls` instead.
This task will be removed in ``django-ca~=3.1.0``.
"""
warning = "This task is deprecated, call `django_ca.tasks.generate_crls`."
warnings.warn(warning, RemovedInDjangoCA320Warning, stacklevel=1)
generate_crls(data)
[docs]
@shared_task(base=DjangoCaTask)
def generate_crl(data: UseCertificateAuthorityTaskArgs) -> None:
"""Celery task to generate CRLs for a single certificate authority.
.. versionadded:: 3.0.0
This task was previously called ``cache_crl``.
"""
assert isinstance(data, UseCertificateAuthorityTaskArgs)
ca: CertificateAuthority = CertificateAuthority.objects.get(serial=data.serial)
key_backend_options = ca.key_backend.use_model.model_validate(
data.key_backend_options, context={"ca": ca, "backend": ca.key_backend}, strict=True
)
ca.generate_crls(key_backend_options, force=data.force)
[docs]
@shared_task(base=DjangoCaTask)
def generate_crls(data: UseCertificateAuthoritiesTaskArgs | None = None) -> None:
"""Task to cache the CRLs for all CAs.
.. versionadded:: 3.0.0
This task was previously called ``cache_crls``.
.. versionchanged:: 3.0.0
The task now supports the `force` and the `exclude` parameter.
"""
if data is None:
data = UseCertificateAuthoritiesTaskArgs()
assert isinstance(data, UseCertificateAuthoritiesTaskArgs)
serials = data.serials
if not serials:
ca_qs = CertificateAuthority.objects.usable()
if data.exclude:
ca_qs = ca_qs.exclude(serial__in=data.exclude)
serials = tuple(ca_qs.values_list("serial", flat=True))
for serial in serials:
try:
options = data.key_backend_options.get(serial, {})
message = UseCertificateAuthorityTaskArgs(
serial=serial, force=data.force, key_backend_options=options
)
run_task(generate_crl, message)
except Exception: # pylint: disable=broad-exception-caught
# NOTE: When using Celery, an exception will only be raised here if task.delay() itself raises an
# exception, e.g. if the connection to the broker fails. Without celery, exceptions in
# `generate_crl()` are raised here directly.
log.exception("Error generating CRL for %s", serial)
[docs]
@shared_task(base=DjangoCaTask)
def generate_ocsp_key(data: UseCertificateAuthorityTaskArgs) -> int | None:
"""Celery task to generate an OCSP key for a single certificate authority.
The `serial` names the certificate authority for which to regenerate the OCSP responder certificate. All
other arguments are passed on to :py:func:`~django_ca.models.CertificateAuthority.generate_ocsp_key`.
The task returns the primary key of the generated certificate if it was generated, or ``None`` otherwise.
"""
assert isinstance(data, UseCertificateAuthorityTaskArgs)
ca: CertificateAuthority = CertificateAuthority.objects.get(serial=data.serial)
key_backend_options = ca.key_backend.use_model.model_validate(
data.key_backend_options, context={"ca": ca, "backend": ca.key_backend}, strict=True
)
cert = ca.generate_ocsp_key(key_backend_options=key_backend_options, force=data.force)
if cert is not None:
return cert.pk
return None
[docs]
@shared_task(base=DjangoCaTask)
def generate_ocsp_keys(data: UseCertificateAuthoritiesTaskArgs | None = None) -> None:
"""Task to generate an OCSP keys for all usable CAs."""
if data is None:
data = UseCertificateAuthoritiesTaskArgs()
assert isinstance(data, UseCertificateAuthoritiesTaskArgs)
serials = data.serials
if not serials:
serials = tuple(CertificateAuthority.objects.usable().values_list("serial", flat=True))
for serial in serials:
try:
options = data.key_backend_options.get(serial, {})
message = UseCertificateAuthorityTaskArgs(serial=serial, key_backend_options=options)
run_task(generate_ocsp_key, message)
except Exception: # pylint: disable=broad-exception-caught
# NOTE: When using Celery, an exception will only be raised here if task.delay() itself raises an
# exception, e.g. if the connection to the broker fails. Without celery, exceptions in
# generate_ocsp_key() are raised here directly.
log.exception("Error creating OCSP responder key for %s", serial)
[docs]
@shared_task
def notify_watchers() -> None:
"""Task to send notification emails for expiring certificates."""
if not model_settings.CA_NOTIFICATION_DAYS: # no notifications are configured
return
# Retrieve non-expired, non-revoked certificates within the notification window
now = timezone.now()
max_expiry = now + max(model_settings.CA_NOTIFICATION_DAYS)
certificates = (
Certificate.objects.valid(now)
.filter(not_after__lte=max_expiry, watchers__isnull=False)
.distinct()
.prefetch_related("notifications", "watchers")
)
notification_days = {td.days for td in model_settings.CA_NOTIFICATION_DAYS}
for cert in certificates:
days_until_expiry = (cert.not_after - now).days
if days_until_expiry not in notification_days:
continue
# Skip if a notification was already sent for this day (uses prefetch cache)
if any(n.days == days_until_expiry for n in cert.notifications.all()):
continue
# Only send if there are watchers to notify
recipient = [w.mail for w in cert.watchers.all()]
if not recipient: # pragma: no cover # already excluded via queryset - this is just a safeguard
continue
timestamp = cert.not_after.strftime("%Y-%m-%d")
subj = f"Certificate expiration for {cert.cn} on {timestamp}"
msg = f"The certificate for {cert.cn} will expire on {timestamp}."
send_mail(subj, msg, settings.DEFAULT_FROM_EMAIL, recipient)
CertificateExpiryNotification.objects.create(certificate=cert, days=days_until_expiry)
[docs]
@shared_task(base=DjangoCaTask)
@transaction.atomic
def api_sign_certificate(data: ApiSignCertificateTaskArgs) -> int | None:
"""Sign a certificate from the given order with the given parameters."""
assert isinstance(data, ApiSignCertificateTaskArgs)
order = CertificateOrder.objects.select_related("certificate_authority").get(pk=data.order_pk)
ca: CertificateAuthority = order.certificate_authority
key_backend_options = ca.key_backend.get_use_private_key_options(ca, data.key_backend_options)
# Get some certificate details
algorithm = data.get_algorithm()
csr = x509.load_pem_x509_csr(data.csr)
extensions = [ext.cryptography for ext in data.extensions]
# Add any CA extensions not already set by the request
extension_oids = tuple(ext.oid for ext in extensions)
for oid, extension in ca.extensions_for_certificate.items():
if oid not in extension_oids:
extensions.append(extension)
# Create a signed certificate
try:
certificate = ca.sign(
key_backend_options,
csr,
subject=data.subject.cryptography,
algorithm=algorithm,
not_after=data.not_after,
extensions=extensions,
)
except Exception: # pylint: disable=broad-exception-caught # really want to catch everything
log.exception("Could not sign certificate")
order.status = CertificateOrder.STATUS_FAILED
order.error_code = 1
order.error = "Could not sign certificate."
order.save()
return None
# Store certificate in database
certificate_obj = Certificate(ca=ca, csr=csr, profile=data.profile, autogenerated=data.autogenerated)
certificate_obj.update_certificate(certificate)
certificate_obj.save()
# Update certificate order
order.status = CertificateOrder.STATUS_ISSUED
order.certificate = certificate_obj
order.save()
return certificate_obj.pk
[docs]
@shared_task
@transaction.atomic
def acme_validate_challenge(challenge_pk: int) -> None:
"""Validate an ACME challenge."""
if not model_settings.CA_ENABLE_ACME:
log.error("ACME is not enabled.")
return
try:
challenge = AcmeChallenge.objects.url().get(pk=challenge_pk)
except AcmeChallenge.DoesNotExist:
log.error("Challenge with id=%s not found", challenge_pk)
return
# Whoever is invoking this task is responsible for setting the status to "processing" first.
if challenge.status != AcmeChallenge.STATUS_PROCESSING:
log.error(
"%s: %s: Invalid state (must be %s)", challenge, challenge.status, AcmeChallenge.STATUS_PROCESSING
)
return
# If the auth cannot be used for validation, neither can this challenge. We check auth.usable instead of
# challenge.usable b/c a challenge in the "processing" state is not "usable" (= it is already being used).
if challenge.auth.usable is False:
log.error("%s: Authentication is not usable", challenge)
return
# General data for challenge validation
value = challenge.auth.value
# Challenge is marked as invalid by default
challenge_valid = False
# Validate HTTP challenge (only thing supported so far)
if challenge.type == AcmeChallenge.TYPE_HTTP_01:
decoded_token = challenge.encoded_token.decode("utf-8")
expected = challenge.expected
if requests is None: # pragma: no cover
log.error("requests is not installed, cannot do http-01 challenge validation.")
return
url = f"http://{value}/.well-known/acme-challenge/{decoded_token}"
try:
with requests.get(url, timeout=1, stream=True) as response:
# Only fetch the response body if the status code is HTTP 200 (OK)
if response.status_code == HTTPStatus.OK:
# Only fetch the expected number of bytes to prevent a large file ending up in memory
# But fetch one extra byte (if available) to make sure that response has no extra bytes
received = response.raw.read(len(expected) + 1, decode_content=True)
challenge_valid = received == expected
except Exception as ex: # pylint: disable=broad-except
log.exception(ex)
elif challenge.type == AcmeChallenge.TYPE_DNS_01:
challenge_valid = validate_dns_01(challenge)
else: # pragma: no cover
log.error("%s: Challenge type is not supported.", challenge)
# Transition state of the challenge depending on if the challenge is valid or not. RFC8555, Section 7.1.6:
#
# "If validation is successful, the challenge moves to the "valid" state; if there is an error, the
# challenge moves to the "invalid" state."
#
# We also transition the matching authorization object:
#
# "If one of the challenges listed in the authorization transitions to the "valid" state, then the
# authorization also changes to the "valid" state. If the client attempts to fulfill a challenge and
# fails, or if there is an error while the authorization is still pending, then the authorization
# transitions to the "invalid" state.
#
# We also transition the matching order object (section 7.4):
#
# "* ready: The server agrees that the requirements have been fulfilled, and is awaiting finalization.
# Submit a finalization request."
if challenge_valid:
challenge.status = AcmeChallenge.STATUS_VALID
challenge.validated = timezone.now()
challenge.auth.status = AcmeAuthorization.STATUS_VALID
# Set the order status to READY if all challenges are valid
auths = AcmeAuthorization.objects.filter(order=challenge.auth.order)
auths = auths.exclude(status=AcmeAuthorization.STATUS_VALID)
if not auths.exclude(pk=challenge.auth.pk).exists():
log.info("Order is now valid")
challenge.auth.order.status = AcmeOrder.STATUS_READY
else:
challenge.status = AcmeChallenge.STATUS_INVALID
# RFC 8555, section 7.1.6:
#
# If the client attempts to fulfill a challenge and fails, or if there is an error while the
# authorization is still pending, then the authorization transitions to the "invalid" state.
challenge.auth.status = AcmeAuthorization.STATUS_INVALID
# RFC 8555, section 7.1.6:
#
# If an error occurs at any of these stages, the order moves to the "invalid" state.
challenge.auth.order.status = AcmeOrder.STATUS_INVALID
log.info("%s is %s", challenge, challenge.status)
challenge.save()
challenge.auth.save()
challenge.auth.order.save()
[docs]
@shared_task
@transaction.atomic
def acme_issue_certificate(acme_certificate_pk: int) -> None:
"""Actually issue an ACME certificate."""
if not model_settings.CA_ENABLE_ACME:
log.error("ACME is not enabled.")
return
try:
acme_cert = AcmeCertificate.objects.select_related("order__account__ca").get(pk=acme_certificate_pk)
except AcmeCertificate.DoesNotExist:
log.error("Certificate with id=%s not found", acme_certificate_pk)
return
if acme_cert.usable is False:
log.error("%s: Cannot issue certificate for this order", acme_cert.order)
return
names = [a.subject_alternative_name for a in acme_cert.order.authorizations.all()]
log.info("%s: Issuing certificate for %s", acme_cert.order, ",".join(names))
subject_alternative_names = x509.SubjectAlternativeName([parse_general_name(name) for name in names])
extensions = [
x509.Extension(
oid=ExtensionOID.SUBJECT_ALTERNATIVE_NAME,
critical=EXTENSION_DEFAULT_CRITICAL[ExtensionOID.SUBJECT_ALTERNATIVE_NAME],
value=subject_alternative_names,
)
]
ca = acme_cert.order.account.ca
profile = profiles[ca.acme_profile]
# Honor not_after from the order if set
if acme_cert.order.not_after:
not_after = acme_cert.order.not_after
# Make sure not_after is tz-aware, even if USE_TZ=False.
if timezone.is_naive(not_after):
not_after = timezone.make_aware(not_after)
else:
not_after = datetime.now(tz=UTC) + model_settings.CA_ACME_DEFAULT_CERT_VALIDITY
csr = acme_cert.parse_csr()
# Initialize key backend options
key_backend_options = ca.key_backend.get_use_private_key_options(ca, {})
# Finally, actually create a certificate
cert = Certificate.objects.create_cert(
ca, key_backend_options, csr=csr, profile=profile, not_after=not_after, extensions=extensions
)
acme_cert.cert = cert
acme_cert.order.status = AcmeOrder.STATUS_VALID
acme_cert.order.save()
acme_cert.save()
[docs]
@shared_task
@transaction.atomic
def acme_cleanup() -> None:
"""Cleanup expired ACME orders."""
if not model_settings.CA_ENABLE_ACME:
# NOTE: Since this task does only cleanup, log message is only info.
log.info("ACME is not enabled, not doing anything.")
return
# Delete orders that expired more than a day ago.
threshold = timezone.now() - timedelta(days=1)
AcmeOrder.objects.filter(expires__lt=threshold).delete()