# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.
"""Base classes for CA backends."""
import abc
import logging
from collections.abc import Iterator, Sequence
from datetime import datetime
from threading import local
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Generic, Self, TypeVar, cast
from pydantic import BaseModel, Field, model_validator
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
from cryptography.hazmat.primitives.asymmetric.padding import AsymmetricPadding
from cryptography.hazmat.primitives.asymmetric.types import (
CertificateIssuerPrivateKeyTypes,
CertificateIssuerPublicKeyTypes,
)
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
from cryptography.x509.ocsp import OCSPResponse, OCSPResponseBuilder
from django.core.exceptions import ImproperlyConfigured
from django.core.management import CommandParser
from django.utils.module_loading import import_string
from django_ca import constants
from django_ca.conf import KeyBackendConfigurationModel, model_settings
from django_ca.constants import DEFAULT_KEY_BACKEND_KEY, SIGNATURE_HASH_ALGORITHM_NAMES
from django_ca.pydantic.type_aliases import PowerOfTwoInt
from django_ca.typehints import (
ArgumentGroup,
CertificateExtension,
ParsableKeyType,
SignatureHashAlgorithm,
SignatureHashAlgorithmName,
)
if TYPE_CHECKING:
from django_ca.models import CertificateAuthority
log = logging.getLogger(__name__)
CreatePrivateKeyOptionsTypeVar = TypeVar("CreatePrivateKeyOptionsTypeVar", bound=BaseModel)
UsePrivateKeyOptionsTypeVar = TypeVar("UsePrivateKeyOptionsTypeVar", bound=BaseModel)
StorePrivateKeyOptionsTypeVar = TypeVar("StorePrivateKeyOptionsTypeVar", bound=BaseModel)
class CreatePrivateKeyOptionsBaseModel(BaseModel):
"""Base model for creating private keys that shares common fields and validators."""
key_type: ParsableKeyType
key_size: Annotated[PowerOfTwoInt, Field(ge=model_settings.CA_MIN_KEY_SIZE)] | None = None
@model_validator(mode="after")
def validate_key_size(self) -> Self:
"""Validate that the key size is not set for invalid key types."""
if self.key_type in ("RSA", "DSA") and self.key_size is None:
self.key_size = model_settings.CA_DEFAULT_KEY_SIZE
elif self.key_type not in ("RSA", "DSA") and self.key_size is not None:
raise ValueError(f"Key size is not supported for {self.key_type} keys.")
return self
[docs]
class KeyBackendBase:
"""Base class for backends that create private keys (CAs or OCSP delegate responder certificates)."""
#: Alias under which this backend is configured under settings.KEY_BACKENDS.
alias: str
#: Private key types supported by the key backend. This defines the choices for the ``--key-type``
#: argument and the `key_type` parameter in
#: :py:func:`~django_ca.key_backends.KeyBackend.get_create_private_key_options` is guaranteed to be
#: one of the named values.
supported_key_types: tuple[str, ...]
#: Hash algorithms supported by the key backend. This defines the choices for the ``--algorithm`` argument
#: and the `algorithm` argument in :py:func:`~django_ca.key_backends.KeyBackend.sign_certificate` is
#: guaranteed to be one of the named values.
supported_hash_algorithms: tuple[SignatureHashAlgorithmName, ...] = tuple(
constants.SIGNATURE_HASH_ALGORITHM_TYPES
)
#: Elliptic curves supported by this backend for elliptic curve keys. This defines the choices for the
#: ``--elliptic-curve`` parameter and the `elliptic_curve` parameter in
#: :py:func:`~django_ca.key_backends.KeyBackend.get_create_private_key_options` is guaranteed to be
#: one of the named values if ``--key-type=EC`` is passed.
supported_elliptic_curves: tuple[str, ...]
def __init__(self, alias: str, **kwargs: Any) -> None:
self.alias = alias
for key, value in kwargs.items():
setattr(self, key, value)
[docs]
class KeyBackend(
KeyBackendBase,
Generic[CreatePrivateKeyOptionsTypeVar, StorePrivateKeyOptionsTypeVar, UsePrivateKeyOptionsTypeVar],
metaclass=abc.ABCMeta,
):
"""Base class for all key storage backends.
All implementations of a key storage backend must implement this abstract base class.
"""
#: Title used for the ArgumentGroup in :command:`manage.py init_ca`.
title: ClassVar[str]
#: Description used for the ArgumentGroup in :command:`manage.py init_ca`.
description: ClassVar[str]
#: The Pydantic model representing the options used for loading a private key.
use_model: type[UsePrivateKeyOptionsTypeVar]
#: Prefix for argparse to use for arguments. Empty for the default alias, and `{alias}-` otherwise.
argparse_prefix: str = ""
#: Prefix to use for loading options. Empty for the default alias, and `{alias}_` otherwise.
options_prefix: str = ""
def __init__(self, alias: str, **kwargs: Any) -> None:
super().__init__(alias, **kwargs)
if self.alias != DEFAULT_KEY_BACKEND_KEY:
self.argparse_prefix = f"{alias.lower().replace('_', '-')}-"
self.options_prefix = f"{alias.lower().replace('-', '_')}_"
[docs]
def add_create_private_key_group(self, parser: CommandParser) -> ArgumentGroup | None:
"""Add an argument group for arguments for private key generation with this backend.
By default, the title and description of the argument group is based on
:py:attr:`~django_ca.key_backends.base.KeyBackendBase.alias`,
:py:attr:`~django_ca.key_backends.KeyBackend.title` and
:py:attr:`~django_ca.key_backends.KeyBackend.description`.
Return ``None`` if you don't need to create such a group.
"""
return parser.add_argument_group(
f"{self.alias}: {self.title}",
f"The backend used with --key-backend={self.alias}. {self.description}",
)
[docs]
def add_store_private_key_group(self, parser: CommandParser) -> ArgumentGroup | None:
"""Add an argument group for storing private keys (when importing an existing CA).
By default, this method adds the same group as
:py:func:`~django_ca.key_backends.KeyBackend.add_create_private_key_group`
"""
return self.add_create_private_key_group(parser)
[docs]
def add_use_private_key_group(self, parser: CommandParser) -> ArgumentGroup | None:
"""Add an argument group for arguments required for using a private key stored with this backend.
By default, the title and description of the argument group is based on
:py:attr:`~django_ca.key_backends.base.KeyBackendBase.alias` and
:py:attr:`~django_ca.key_backends.KeyBackend.title`.
Return ``None`` if you don't need to create such a group.
"""
return parser.add_argument_group(
f"{self.alias} key storage",
f"Arguments for using private keys stored with the {self.alias} backend.",
)
# pylint: disable-next=unused-argument # default implementation does nothing.
[docs]
def add_create_private_key_arguments(self, group: ArgumentGroup) -> None:
"""Add arguments for private key generation with this backend.
Add arguments that can be used for generating private keys with your backend to `group`. The arguments
you add here are expected to be loaded (and validated) using
:py:func:`~django_ca.key_backends.KeyBackend.get_create_private_key_options`.
"""
return None
# pylint: disable-next=unused-argument # default implementation does nothing.
[docs]
def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None:
"""Add arguments for loading the private key of a parent certificate authority.
The arguments you add here are expected to be loaded (and validated) using
:py:func:`~django_ca.key_backends.KeyBackend.get_use_parent_private_key_options`.
"""
return None
# pylint: disable-next=unused-argument # default implementation does nothing.
[docs]
def add_store_private_key_arguments(self, group: ArgumentGroup) -> None:
"""Add arguments for storing private keys (when importing an existing CA)."""
return None
# pylint: disable=unused-argument # Method may not be overwritten, just providing default here
[docs]
def add_use_private_key_arguments(self, group: ArgumentGroup) -> None:
"""Add arguments required for using private key stored with this backend.
The arguments you add here are expected to be loaded (and validated) using
:py:func:`~django_ca.key_backends.KeyBackend.get_use_parent_private_key_options`.
"""
return None
[docs]
@abc.abstractmethod
def get_create_private_key_options(
self,
key_type: ParsableKeyType,
key_size: int | None,
elliptic_curve: str | None,
options: dict[str, Any],
) -> CreatePrivateKeyOptionsTypeVar:
"""Get options to create private keys into a Pydantic model.
`options` is the dictionary of arguments from :command:`manage.py init_ca` (including default values).
The returned model will be passed to
:py:func:`~django_ca.key_backends.KeyBackend.create_private_key`.
"""
[docs]
@abc.abstractmethod
def get_use_parent_private_key_options(
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> UsePrivateKeyOptionsTypeVar:
"""Get options to use the private key of a parent certificate authority.
The returned model will be used for the certificate authority `ca`. You can pass it as extra context
to influence model validation.
`options` is the dictionary of arguments to :command:`manage.py init_ca` (including default values).
The key backend is expected to be able to sign certificate authorities using the options provided
here.
"""
[docs]
@abc.abstractmethod
def get_use_private_key_options(
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> UsePrivateKeyOptionsTypeVar:
"""Get options to use the private key of a certificate authority.
The returned model will be used for the certificate authority `ca`. You can pass it as extra context
to influence model validation.
`options` is the dictionary of arguments to :command:`manage.py init_ca` (including default values).
The key backend is expected to be able to sign certificates and CRLs using the options provided here.
"""
[docs]
@abc.abstractmethod
def get_store_private_key_options(self, options: dict[str, Any]) -> StorePrivateKeyOptionsTypeVar:
"""Get options used when storing private keys."""
[docs]
@abc.abstractmethod
def is_usable(
self,
ca: "CertificateAuthority",
use_private_key_options: UsePrivateKeyOptionsTypeVar | None = None,
) -> bool:
"""Boolean returning if the given `ca` can be used to sign new certificates (or CRLs).
The `options` are the options returned by
:py:func:`~django_ca.key_backends.KeyBackend.get_use_private_key_options`. It may be ``None`` in
cases where key options cannot (yet) be loaded. If ``None``, the backend should return ``False`` if it
knows for sure that it will not be usable, and ``True`` if usability cannot be determined.
"""
[docs]
@abc.abstractmethod
def check_usable(
self, ca: "CertificateAuthority", use_private_key_options: UsePrivateKeyOptionsTypeVar
) -> None:
"""Check if the given CA is usable, raise ValueError if not.
The `options` are the options returned by
:py:func:`~django_ca.key_backends.KeyBackend.get_use_private_key_options`. It may be ``None`` in
cases where key options cannot (yet) be loaded. If ``None``, the backend should return ``False`` if it
knows for sure that it will not be usable, and ``True`` if usability cannot be determined.
"""
[docs]
@abc.abstractmethod
def create_private_key(
self, ca: "CertificateAuthority", key_type: ParsableKeyType, options: CreatePrivateKeyOptionsTypeVar
) -> tuple[CertificateIssuerPublicKeyTypes, UsePrivateKeyOptionsTypeVar]:
"""Create a private key for the certificate authority.
The method is expected to set `key_backend_options` on `ca` with a set of options that can later be
used to load the private key. Since this value will be stored in the database, you should not add
secrets to `key_backend_options`.
Note that `ca` is not yet a *saved* database entity, so fields are only partially populated.
"""
[docs]
@abc.abstractmethod
def store_private_key(
self,
ca: "CertificateAuthority",
key: CertificateIssuerPrivateKeyTypes,
certificate: x509.Certificate,
options: StorePrivateKeyOptionsTypeVar,
) -> None:
"""Store a private key for the certificate authority.
The method is expected to set `key_backend_options` on `ca` with a set of options that can later be
used to load the private key. Since this value will be stored in the database, you should not add
secrets to `key_backend_options`.
Note that `ca` is not yet a *saved* database entity, so fields are only partially populated.
"""
[docs]
def sign_data(
self,
ca: "CertificateAuthority",
use_private_key_options: UsePrivateKeyOptionsTypeVar,
data: bytes,
*,
algorithm: hashes.HashAlgorithm | Prehashed | None = None,
padding: AsymmetricPadding | None = None,
signature_algorithm: ec.EllipticCurveSignatureAlgorithm | None = None,
) -> bytes:
"""Sign arbitrary data.
The `algorithm` parameter is used when using RSA/DSA keys, `padding` is used for RSA keys and
`signature_algorithm` is used for Elliptic Curve keys.
This function is not used by **django-ca** itself, but may be used by plugins. Backends are not
required to implement it, in which case this function raises ``NotImplementedError``.
"""
raise NotImplementedError("This backend does not support signing data.")
[docs]
@abc.abstractmethod
def sign_certificate(
self,
ca: "CertificateAuthority",
use_private_key_options: UsePrivateKeyOptionsTypeVar,
public_key: CertificateIssuerPublicKeyTypes,
serial: int,
algorithm: SignatureHashAlgorithm | None,
issuer: x509.Name,
subject: x509.Name,
not_after: datetime,
# NOTE: Allows any extension, as the function is also used for creating certificate authorities.
extensions: Sequence[CertificateExtension],
) -> x509.Certificate:
"""Sign a certificate."""
[docs]
@abc.abstractmethod
def sign_certificate_revocation_list(
self,
ca: "CertificateAuthority",
use_private_key_options: UsePrivateKeyOptionsTypeVar,
builder: x509.CertificateRevocationListBuilder,
algorithm: SignatureHashAlgorithm | None,
) -> x509.CertificateRevocationList:
"""Sign a certificate revocation list request."""
[docs]
def validate_signature_hash_algorithm(
self,
key_type: ParsableKeyType,
algorithm: SignatureHashAlgorithm | None,
default: SignatureHashAlgorithm | None = None,
) -> SignatureHashAlgorithm | None:
"""Give a backend the opportunity to check the signature hash algorithm or return the default value.
The `algorithm` is the one selected by the user, or ``None`` if no algorithm was selected. The
`default` reflects the signature algorithm of a signing certificate authority and is ``None`` only
when creating a root certificate authority.
Any backend implementation should raise ``ValueError`` if it wants to veto a particular combination of
key type and algorithm.
"""
if key_type not in ("DSA", "RSA", "EC"):
if algorithm is not None:
raise ValueError(f"{key_type} keys do not allow an algorithm for signing.")
return None
# Compute the default hash algorithm
if algorithm is not None:
name = SIGNATURE_HASH_ALGORITHM_NAMES[type(algorithm)]
elif default is not None:
name = constants.SIGNATURE_HASH_ALGORITHM_NAMES[type(default)]
algorithm = default
elif key_type == "DSA":
name = model_settings.CA_DEFAULT_DSA_SIGNATURE_HASH_ALGORITHM
algorithm = model_settings.get_default_dsa_signature_hash_algorithm()
else:
name = model_settings.CA_DEFAULT_SIGNATURE_HASH_ALGORITHM
algorithm = model_settings.get_default_signature_hash_algorithm()
# Make sure that the selected signature hash algorithm works for this backend.
if name not in self.supported_hash_algorithms:
raise ValueError(f"{name}: Algorithm not supported by {self.alias} key backend.")
return algorithm
[docs]
class OCSPKeyBackend(KeyBackendBase):
"""Base class for all OCSP key storage backends."""
[docs]
@abc.abstractmethod
def create_private_key(
self,
ca: "CertificateAuthority",
key_type: ParsableKeyType,
key_size: int | None,
elliptic_curve: ec.EllipticCurve | None,
) -> x509.CertificateSigningRequest:
"""Create the private key.
This method is responsible for creating the private key, storing it, and saving info to
ca.ocsp_key_backend_options so it can be signed. You're not responsible for the public key at all.
"""
[docs]
def get_csr_algorithm(self, key_type: ParsableKeyType) -> SignatureHashAlgorithm | None:
"""Helper function to get a usable signing algorithm for the given key type.
This function can be used to get a default signing algorithm when creating a CSR in
:py:func:`~django_ca.key_backends.OCSPKeyBackend.create_private_key`. You are not obliged to use
this function, it is here merely for convenience.
"""
if key_type in ("Ed25519", "Ed448"):
return None
if key_type == "DSA":
return model_settings.get_default_dsa_signature_hash_algorithm()
return model_settings.get_default_signature_hash_algorithm()
[docs]
def get_default_elliptic_curve(self, ca: "CertificateAuthority") -> ec.EllipticCurve:
"""Get the default elliptic curve used when creating OCSP private keys.
**Note** that you are not usually required to implement this method.
This function is called when generating keys for EC-based CAs or if the user explicitly requested an
EC-based OCSP key but did *not* specify a curve.
The default implementation returns the curve of the CA for EC-based CAs and the
:ref:`CA_DEFAULT_ELLIPTIC_CURVE <settings-ca-default-elliptic-curve>` setting otherwise.
"""
public_key = ca.pub.loaded.public_key()
if isinstance(public_key, ec.EllipticCurvePublicKey):
return public_key.curve
return model_settings.get_default_elliptic_curve()
[docs]
def get_default_key_size(self, ca: "CertificateAuthority") -> int:
"""Get the default key size used when creating OCSP private keys.
**Note** that you are not usually required to implement this method.
This function is called when generating keys for DSA/RSA-based CAs or if the user explicitly requested
an RSA/DSA-based OCSP key but did *not* specify a key size.
The default implementation returns the key size of the CA for RSA/DSA-based CAs and the
:ref:`CA_DEFAULT_KEY_SIZE <settings-ca-default-key-size>` setting otherwise.
"""
public_key = ca.pub.loaded.public_key()
if isinstance(public_key, rsa.RSAPublicKey | dsa.DSAPublicKey):
return public_key.key_size
return model_settings.CA_DEFAULT_KEY_SIZE
[docs]
@abc.abstractmethod
def sign_ocsp_response(
self,
ca: "CertificateAuthority",
builder: OCSPResponseBuilder,
signature_hash_algorithm: SignatureHashAlgorithm | None,
) -> OCSPResponse:
"""Sign the given OCSP response."""
class CryptographyOCSPKeyBackend(OCSPKeyBackend):
"""Specialized base class for backends that can represent the private key as cryptography classes.
Subclasses are only required to implement
:func:`~django_ca.key_backends.CryptographyOCSPKeyBackend.load_private_key_data` to return the raw private
key in either PEM or DER representation (DER is faster and thus preferred). The returned private key
must be a suitable argument for :func:`~cg:cryptography.x509.ocsp.OCSPResponseBuilder.sign`.
"""
@abc.abstractmethod
def load_private_key_data(self, ca: "CertificateAuthority") -> bytes:
"""Load the raw private key as bytes. Both PEM and DER representations are supported."""
# COVERAGE NOTE: Function is implemented in all subclasses.`
# pylint: disable-next=unused-argument
def get_private_key_password(self, ca: "CertificateAuthority") -> bytes | None: # pragma: no cover
"""Get the private key password. The default implementation never returns a password."""
return None
def load_private_key(self, ca: "CertificateAuthority") -> CertificateIssuerPrivateKeyTypes:
"""Function to load the private key and return the raw private key."""
raw_private_key = self.load_private_key_data(ca)
password = self.get_private_key_password(ca) # pylint: disable=assignment-from-none
try:
loaded_key = serialization.load_der_private_key(raw_private_key, password)
except ValueError:
try:
loaded_key = serialization.load_pem_private_key(raw_private_key, password)
except ValueError as ex:
raise ValueError("Could not decrypt private key.") from ex
# Check that the private key is of a supported type
if not isinstance(loaded_key, constants.PRIVATE_KEY_TYPES):
log.error("%s: Unsupported private key type.", type(loaded_key))
raise ValueError(f"{type(loaded_key)}: Unsupported private key type.")
return loaded_key
def sign_ocsp_response(
self,
ca: "CertificateAuthority",
builder: OCSPResponseBuilder,
signature_hash_algorithm: SignatureHashAlgorithm | None,
) -> OCSPResponse:
private_key = self.load_private_key(ca)
return builder.sign(private_key, signature_hash_algorithm)
BackendTypeVar = TypeVar("BackendTypeVar", bound=KeyBackendBase)
class KeyBackendsProxy(Generic[BackendTypeVar]):
"""Base for proxy classes."""
_setting: ClassVar[str]
_type: type[BackendTypeVar]
def __init__(self) -> None:
self._backends = local()
def __getitem__(self, name: str) -> BackendTypeVar:
try:
return cast(BackendTypeVar, self._backends.backends[name])
except AttributeError:
self._backends.backends = {} # first backend is loaded
except KeyError:
pass # this backend not yet loaded
self._backends.backends[name] = self._get_key_backend(name)
# TYPEHINT NOTE: _get_key_backend should not write anything into this variable
return self._backends.backends[name] # type: ignore[no-any-return]
def __iter__(self) -> Iterator[BackendTypeVar]:
for name in getattr(model_settings, self._setting):
yield self[name]
def _reset(self) -> None:
self._backends = local()
def _get_key_backend(self, alias: str) -> BackendTypeVar:
"""Get the key backend with the given alias."""
try:
configuration: KeyBackendConfigurationModel = getattr(model_settings, self._setting)[alias]
except KeyError as ex:
raise ValueError(f"{alias}: key backend is not configured.") from ex
backend = configuration.BACKEND
options = configuration.OPTIONS.copy()
try:
backend_cls = import_string(backend)
except ImportError as ex:
raise ImproperlyConfigured(f"Could not find backend {backend!r}: {ex}") from ex
if not issubclass(backend_cls, self._type):
raise ImproperlyConfigured(f"{backend}: Class does not refer to a key backend.")
# TYPEHINT NOTE: we check for the correct subclass above.
return backend_cls(alias, **options) # type: ignore[no-any-return]
class KeyBackends(KeyBackendsProxy[KeyBackend[BaseModel, BaseModel, BaseModel]]):
"""A key backend handler similar to Django's storages or caches handler."""
_setting = "CA_KEY_BACKENDS"
_type = KeyBackend # type: ignore[assignment]
class OCSPKeyBackends(KeyBackendsProxy[OCSPKeyBackend]):
"""An OCSP key backend handler similar to Django's storages or caches handler."""
_setting = "CA_OCSP_KEY_BACKENDS"
_type = OCSPKeyBackend
key_backends = KeyBackends()
ocsp_key_backends = OCSPKeyBackends()