Source code for django_ca.key_backends.base

# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Base classes for CA backends."""

import abc
import logging
from collections.abc import Iterator, Sequence
from datetime import datetime
from threading import local
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Generic, Self, TypeVar, cast

from pydantic import BaseModel, Field, model_validator

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
from cryptography.hazmat.primitives.asymmetric.padding import AsymmetricPadding
from cryptography.hazmat.primitives.asymmetric.types import (
    CertificateIssuerPrivateKeyTypes,
    CertificateIssuerPublicKeyTypes,
)
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
from cryptography.x509.ocsp import OCSPResponse, OCSPResponseBuilder

from django.core.exceptions import ImproperlyConfigured
from django.core.management import CommandParser
from django.utils.module_loading import import_string

from django_ca import constants
from django_ca.conf import KeyBackendConfigurationModel, model_settings
from django_ca.constants import DEFAULT_KEY_BACKEND_KEY, SIGNATURE_HASH_ALGORITHM_NAMES
from django_ca.pydantic.type_aliases import PowerOfTwoInt
from django_ca.typehints import (
    ArgumentGroup,
    CertificateExtension,
    ParsableKeyType,
    SignatureHashAlgorithm,
    SignatureHashAlgorithmName,
)

if TYPE_CHECKING:
    from django_ca.models import CertificateAuthority


log = logging.getLogger(__name__)

CreatePrivateKeyOptionsTypeVar = TypeVar("CreatePrivateKeyOptionsTypeVar", bound=BaseModel)
UsePrivateKeyOptionsTypeVar = TypeVar("UsePrivateKeyOptionsTypeVar", bound=BaseModel)
StorePrivateKeyOptionsTypeVar = TypeVar("StorePrivateKeyOptionsTypeVar", bound=BaseModel)


class CreatePrivateKeyOptionsBaseModel(BaseModel):
    """Base model for creating private keys that shares common fields and validators."""

    key_type: ParsableKeyType
    key_size: Annotated[PowerOfTwoInt, Field(ge=model_settings.CA_MIN_KEY_SIZE)] | None = None

    @model_validator(mode="after")
    def validate_key_size(self) -> Self:
        """Validate that the key size is not set for invalid key types."""
        if self.key_type in ("RSA", "DSA") and self.key_size is None:
            self.key_size = model_settings.CA_DEFAULT_KEY_SIZE
        elif self.key_type not in ("RSA", "DSA") and self.key_size is not None:
            raise ValueError(f"Key size is not supported for {self.key_type} keys.")
        return self


[docs] class KeyBackendBase: """Base class for backends that create private keys (CAs or OCSP delegate responder certificates).""" #: Alias under which this backend is configured under settings.KEY_BACKENDS. alias: str #: Private key types supported by the key backend. This defines the choices for the ``--key-type`` #: argument and the `key_type` parameter in #: :py:func:`~django_ca.key_backends.KeyBackend.get_create_private_key_options` is guaranteed to be #: one of the named values. supported_key_types: tuple[str, ...] #: Hash algorithms supported by the key backend. This defines the choices for the ``--algorithm`` argument #: and the `algorithm` argument in :py:func:`~django_ca.key_backends.KeyBackend.sign_certificate` is #: guaranteed to be one of the named values. supported_hash_algorithms: tuple[SignatureHashAlgorithmName, ...] = tuple( constants.SIGNATURE_HASH_ALGORITHM_TYPES ) #: Elliptic curves supported by this backend for elliptic curve keys. This defines the choices for the #: ``--elliptic-curve`` parameter and the `elliptic_curve` parameter in #: :py:func:`~django_ca.key_backends.KeyBackend.get_create_private_key_options` is guaranteed to be #: one of the named values if ``--key-type=EC`` is passed. supported_elliptic_curves: tuple[str, ...] def __init__(self, alias: str, **kwargs: Any) -> None: self.alias = alias for key, value in kwargs.items(): setattr(self, key, value)
[docs] class KeyBackend( KeyBackendBase, Generic[CreatePrivateKeyOptionsTypeVar, StorePrivateKeyOptionsTypeVar, UsePrivateKeyOptionsTypeVar], metaclass=abc.ABCMeta, ): """Base class for all key storage backends. All implementations of a key storage backend must implement this abstract base class. """ #: Title used for the ArgumentGroup in :command:`manage.py init_ca`. title: ClassVar[str] #: Description used for the ArgumentGroup in :command:`manage.py init_ca`. description: ClassVar[str] #: The Pydantic model representing the options used for loading a private key. use_model: type[UsePrivateKeyOptionsTypeVar] #: Prefix for argparse to use for arguments. Empty for the default alias, and `{alias}-` otherwise. argparse_prefix: str = "" #: Prefix to use for loading options. Empty for the default alias, and `{alias}_` otherwise. options_prefix: str = "" def __init__(self, alias: str, **kwargs: Any) -> None: super().__init__(alias, **kwargs) if self.alias != DEFAULT_KEY_BACKEND_KEY: self.argparse_prefix = f"{alias.lower().replace('_', '-')}-" self.options_prefix = f"{alias.lower().replace('-', '_')}_"
[docs] def add_create_private_key_group(self, parser: CommandParser) -> ArgumentGroup | None: """Add an argument group for arguments for private key generation with this backend. By default, the title and description of the argument group is based on :py:attr:`~django_ca.key_backends.base.KeyBackendBase.alias`, :py:attr:`~django_ca.key_backends.KeyBackend.title` and :py:attr:`~django_ca.key_backends.KeyBackend.description`. Return ``None`` if you don't need to create such a group. """ return parser.add_argument_group( f"{self.alias}: {self.title}", f"The backend used with --key-backend={self.alias}. {self.description}", )
[docs] def add_store_private_key_group(self, parser: CommandParser) -> ArgumentGroup | None: """Add an argument group for storing private keys (when importing an existing CA). By default, this method adds the same group as :py:func:`~django_ca.key_backends.KeyBackend.add_create_private_key_group` """ return self.add_create_private_key_group(parser)
[docs] def add_use_private_key_group(self, parser: CommandParser) -> ArgumentGroup | None: """Add an argument group for arguments required for using a private key stored with this backend. By default, the title and description of the argument group is based on :py:attr:`~django_ca.key_backends.base.KeyBackendBase.alias` and :py:attr:`~django_ca.key_backends.KeyBackend.title`. Return ``None`` if you don't need to create such a group. """ return parser.add_argument_group( f"{self.alias} key storage", f"Arguments for using private keys stored with the {self.alias} backend.", )
# pylint: disable-next=unused-argument # default implementation does nothing.
[docs] def add_create_private_key_arguments(self, group: ArgumentGroup) -> None: """Add arguments for private key generation with this backend. Add arguments that can be used for generating private keys with your backend to `group`. The arguments you add here are expected to be loaded (and validated) using :py:func:`~django_ca.key_backends.KeyBackend.get_create_private_key_options`. """ return None
# pylint: disable-next=unused-argument # default implementation does nothing.
[docs] def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None: """Add arguments for loading the private key of a parent certificate authority. The arguments you add here are expected to be loaded (and validated) using :py:func:`~django_ca.key_backends.KeyBackend.get_use_parent_private_key_options`. """ return None
# pylint: disable-next=unused-argument # default implementation does nothing.
[docs] def add_store_private_key_arguments(self, group: ArgumentGroup) -> None: """Add arguments for storing private keys (when importing an existing CA).""" return None
# pylint: disable=unused-argument # Method may not be overwritten, just providing default here
[docs] def add_use_private_key_arguments(self, group: ArgumentGroup) -> None: """Add arguments required for using private key stored with this backend. The arguments you add here are expected to be loaded (and validated) using :py:func:`~django_ca.key_backends.KeyBackend.get_use_parent_private_key_options`. """ return None
[docs] @abc.abstractmethod def get_create_private_key_options( self, key_type: ParsableKeyType, key_size: int | None, elliptic_curve: str | None, options: dict[str, Any], ) -> CreatePrivateKeyOptionsTypeVar: """Get options to create private keys into a Pydantic model. `options` is the dictionary of arguments from :command:`manage.py init_ca` (including default values). The returned model will be passed to :py:func:`~django_ca.key_backends.KeyBackend.create_private_key`. """
[docs] @abc.abstractmethod def get_use_parent_private_key_options( self, ca: "CertificateAuthority", options: dict[str, Any] ) -> UsePrivateKeyOptionsTypeVar: """Get options to use the private key of a parent certificate authority. The returned model will be used for the certificate authority `ca`. You can pass it as extra context to influence model validation. `options` is the dictionary of arguments to :command:`manage.py init_ca` (including default values). The key backend is expected to be able to sign certificate authorities using the options provided here. """
[docs] @abc.abstractmethod def get_use_private_key_options( self, ca: "CertificateAuthority", options: dict[str, Any] ) -> UsePrivateKeyOptionsTypeVar: """Get options to use the private key of a certificate authority. The returned model will be used for the certificate authority `ca`. You can pass it as extra context to influence model validation. `options` is the dictionary of arguments to :command:`manage.py init_ca` (including default values). The key backend is expected to be able to sign certificates and CRLs using the options provided here. """
[docs] @abc.abstractmethod def get_store_private_key_options(self, options: dict[str, Any]) -> StorePrivateKeyOptionsTypeVar: """Get options used when storing private keys."""
[docs] @abc.abstractmethod def is_usable( self, ca: "CertificateAuthority", use_private_key_options: UsePrivateKeyOptionsTypeVar | None = None, ) -> bool: """Boolean returning if the given `ca` can be used to sign new certificates (or CRLs). The `options` are the options returned by :py:func:`~django_ca.key_backends.KeyBackend.get_use_private_key_options`. It may be ``None`` in cases where key options cannot (yet) be loaded. If ``None``, the backend should return ``False`` if it knows for sure that it will not be usable, and ``True`` if usability cannot be determined. """
[docs] @abc.abstractmethod def check_usable( self, ca: "CertificateAuthority", use_private_key_options: UsePrivateKeyOptionsTypeVar ) -> None: """Check if the given CA is usable, raise ValueError if not. The `options` are the options returned by :py:func:`~django_ca.key_backends.KeyBackend.get_use_private_key_options`. It may be ``None`` in cases where key options cannot (yet) be loaded. If ``None``, the backend should return ``False`` if it knows for sure that it will not be usable, and ``True`` if usability cannot be determined. """
[docs] @abc.abstractmethod def create_private_key( self, ca: "CertificateAuthority", key_type: ParsableKeyType, options: CreatePrivateKeyOptionsTypeVar ) -> tuple[CertificateIssuerPublicKeyTypes, UsePrivateKeyOptionsTypeVar]: """Create a private key for the certificate authority. The method is expected to set `key_backend_options` on `ca` with a set of options that can later be used to load the private key. Since this value will be stored in the database, you should not add secrets to `key_backend_options`. Note that `ca` is not yet a *saved* database entity, so fields are only partially populated. """
[docs] @abc.abstractmethod def store_private_key( self, ca: "CertificateAuthority", key: CertificateIssuerPrivateKeyTypes, certificate: x509.Certificate, options: StorePrivateKeyOptionsTypeVar, ) -> None: """Store a private key for the certificate authority. The method is expected to set `key_backend_options` on `ca` with a set of options that can later be used to load the private key. Since this value will be stored in the database, you should not add secrets to `key_backend_options`. Note that `ca` is not yet a *saved* database entity, so fields are only partially populated. """
[docs] def sign_data( self, ca: "CertificateAuthority", use_private_key_options: UsePrivateKeyOptionsTypeVar, data: bytes, *, algorithm: hashes.HashAlgorithm | Prehashed | None = None, padding: AsymmetricPadding | None = None, signature_algorithm: ec.EllipticCurveSignatureAlgorithm | None = None, ) -> bytes: """Sign arbitrary data. The `algorithm` parameter is used when using RSA/DSA keys, `padding` is used for RSA keys and `signature_algorithm` is used for Elliptic Curve keys. This function is not used by **django-ca** itself, but may be used by plugins. Backends are not required to implement it, in which case this function raises ``NotImplementedError``. """ raise NotImplementedError("This backend does not support signing data.")
[docs] @abc.abstractmethod def sign_certificate( self, ca: "CertificateAuthority", use_private_key_options: UsePrivateKeyOptionsTypeVar, public_key: CertificateIssuerPublicKeyTypes, serial: int, algorithm: SignatureHashAlgorithm | None, issuer: x509.Name, subject: x509.Name, not_after: datetime, # NOTE: Allows any extension, as the function is also used for creating certificate authorities. extensions: Sequence[CertificateExtension], ) -> x509.Certificate: """Sign a certificate."""
[docs] @abc.abstractmethod def sign_certificate_revocation_list( self, ca: "CertificateAuthority", use_private_key_options: UsePrivateKeyOptionsTypeVar, builder: x509.CertificateRevocationListBuilder, algorithm: SignatureHashAlgorithm | None, ) -> x509.CertificateRevocationList: """Sign a certificate revocation list request."""
[docs] def validate_signature_hash_algorithm( self, key_type: ParsableKeyType, algorithm: SignatureHashAlgorithm | None, default: SignatureHashAlgorithm | None = None, ) -> SignatureHashAlgorithm | None: """Give a backend the opportunity to check the signature hash algorithm or return the default value. The `algorithm` is the one selected by the user, or ``None`` if no algorithm was selected. The `default` reflects the signature algorithm of a signing certificate authority and is ``None`` only when creating a root certificate authority. Any backend implementation should raise ``ValueError`` if it wants to veto a particular combination of key type and algorithm. """ if key_type not in ("DSA", "RSA", "EC"): if algorithm is not None: raise ValueError(f"{key_type} keys do not allow an algorithm for signing.") return None # Compute the default hash algorithm if algorithm is not None: name = SIGNATURE_HASH_ALGORITHM_NAMES[type(algorithm)] elif default is not None: name = constants.SIGNATURE_HASH_ALGORITHM_NAMES[type(default)] algorithm = default elif key_type == "DSA": name = model_settings.CA_DEFAULT_DSA_SIGNATURE_HASH_ALGORITHM algorithm = model_settings.get_default_dsa_signature_hash_algorithm() else: name = model_settings.CA_DEFAULT_SIGNATURE_HASH_ALGORITHM algorithm = model_settings.get_default_signature_hash_algorithm() # Make sure that the selected signature hash algorithm works for this backend. if name not in self.supported_hash_algorithms: raise ValueError(f"{name}: Algorithm not supported by {self.alias} key backend.") return algorithm
[docs] class OCSPKeyBackend(KeyBackendBase): """Base class for all OCSP key storage backends."""
[docs] @abc.abstractmethod def create_private_key( self, ca: "CertificateAuthority", key_type: ParsableKeyType, key_size: int | None, elliptic_curve: ec.EllipticCurve | None, ) -> x509.CertificateSigningRequest: """Create the private key. This method is responsible for creating the private key, storing it, and saving info to ca.ocsp_key_backend_options so it can be signed. You're not responsible for the public key at all. """
[docs] def get_csr_algorithm(self, key_type: ParsableKeyType) -> SignatureHashAlgorithm | None: """Helper function to get a usable signing algorithm for the given key type. This function can be used to get a default signing algorithm when creating a CSR in :py:func:`~django_ca.key_backends.OCSPKeyBackend.create_private_key`. You are not obliged to use this function, it is here merely for convenience. """ if key_type in ("Ed25519", "Ed448"): return None if key_type == "DSA": return model_settings.get_default_dsa_signature_hash_algorithm() return model_settings.get_default_signature_hash_algorithm()
[docs] def get_default_elliptic_curve(self, ca: "CertificateAuthority") -> ec.EllipticCurve: """Get the default elliptic curve used when creating OCSP private keys. **Note** that you are not usually required to implement this method. This function is called when generating keys for EC-based CAs or if the user explicitly requested an EC-based OCSP key but did *not* specify a curve. The default implementation returns the curve of the CA for EC-based CAs and the :ref:`CA_DEFAULT_ELLIPTIC_CURVE <settings-ca-default-elliptic-curve>` setting otherwise. """ public_key = ca.pub.loaded.public_key() if isinstance(public_key, ec.EllipticCurvePublicKey): return public_key.curve return model_settings.get_default_elliptic_curve()
[docs] def get_default_key_size(self, ca: "CertificateAuthority") -> int: """Get the default key size used when creating OCSP private keys. **Note** that you are not usually required to implement this method. This function is called when generating keys for DSA/RSA-based CAs or if the user explicitly requested an RSA/DSA-based OCSP key but did *not* specify a key size. The default implementation returns the key size of the CA for RSA/DSA-based CAs and the :ref:`CA_DEFAULT_KEY_SIZE <settings-ca-default-key-size>` setting otherwise. """ public_key = ca.pub.loaded.public_key() if isinstance(public_key, rsa.RSAPublicKey | dsa.DSAPublicKey): return public_key.key_size return model_settings.CA_DEFAULT_KEY_SIZE
[docs] @abc.abstractmethod def sign_ocsp_response( self, ca: "CertificateAuthority", builder: OCSPResponseBuilder, signature_hash_algorithm: SignatureHashAlgorithm | None, ) -> OCSPResponse: """Sign the given OCSP response."""
class CryptographyOCSPKeyBackend(OCSPKeyBackend): """Specialized base class for backends that can represent the private key as cryptography classes. Subclasses are only required to implement :func:`~django_ca.key_backends.CryptographyOCSPKeyBackend.load_private_key_data` to return the raw private key in either PEM or DER representation (DER is faster and thus preferred). The returned private key must be a suitable argument for :func:`~cg:cryptography.x509.ocsp.OCSPResponseBuilder.sign`. """ @abc.abstractmethod def load_private_key_data(self, ca: "CertificateAuthority") -> bytes: """Load the raw private key as bytes. Both PEM and DER representations are supported.""" # COVERAGE NOTE: Function is implemented in all subclasses.` # pylint: disable-next=unused-argument def get_private_key_password(self, ca: "CertificateAuthority") -> bytes | None: # pragma: no cover """Get the private key password. The default implementation never returns a password.""" return None def load_private_key(self, ca: "CertificateAuthority") -> CertificateIssuerPrivateKeyTypes: """Function to load the private key and return the raw private key.""" raw_private_key = self.load_private_key_data(ca) password = self.get_private_key_password(ca) # pylint: disable=assignment-from-none try: loaded_key = serialization.load_der_private_key(raw_private_key, password) except ValueError: try: loaded_key = serialization.load_pem_private_key(raw_private_key, password) except ValueError as ex: raise ValueError("Could not decrypt private key.") from ex # Check that the private key is of a supported type if not isinstance(loaded_key, constants.PRIVATE_KEY_TYPES): log.error("%s: Unsupported private key type.", type(loaded_key)) raise ValueError(f"{type(loaded_key)}: Unsupported private key type.") return loaded_key def sign_ocsp_response( self, ca: "CertificateAuthority", builder: OCSPResponseBuilder, signature_hash_algorithm: SignatureHashAlgorithm | None, ) -> OCSPResponse: private_key = self.load_private_key(ca) return builder.sign(private_key, signature_hash_algorithm) BackendTypeVar = TypeVar("BackendTypeVar", bound=KeyBackendBase) class KeyBackendsProxy(Generic[BackendTypeVar]): """Base for proxy classes.""" _setting: ClassVar[str] _type: type[BackendTypeVar] def __init__(self) -> None: self._backends = local() def __getitem__(self, name: str) -> BackendTypeVar: try: return cast(BackendTypeVar, self._backends.backends[name]) except AttributeError: self._backends.backends = {} # first backend is loaded except KeyError: pass # this backend not yet loaded self._backends.backends[name] = self._get_key_backend(name) # TYPEHINT NOTE: _get_key_backend should not write anything into this variable return self._backends.backends[name] # type: ignore[no-any-return] def __iter__(self) -> Iterator[BackendTypeVar]: for name in getattr(model_settings, self._setting): yield self[name] def _reset(self) -> None: self._backends = local() def _get_key_backend(self, alias: str) -> BackendTypeVar: """Get the key backend with the given alias.""" try: configuration: KeyBackendConfigurationModel = getattr(model_settings, self._setting)[alias] except KeyError as ex: raise ValueError(f"{alias}: key backend is not configured.") from ex backend = configuration.BACKEND options = configuration.OPTIONS.copy() try: backend_cls = import_string(backend) except ImportError as ex: raise ImproperlyConfigured(f"Could not find backend {backend!r}: {ex}") from ex if not issubclass(backend_cls, self._type): raise ImproperlyConfigured(f"{backend}: Class does not refer to a key backend.") # TYPEHINT NOTE: we check for the correct subclass above. return backend_cls(alias, **options) # type: ignore[no-any-return] class KeyBackends(KeyBackendsProxy[KeyBackend[BaseModel, BaseModel, BaseModel]]): """A key backend handler similar to Django's storages or caches handler.""" _setting = "CA_KEY_BACKENDS" _type = KeyBackend # type: ignore[assignment] class OCSPKeyBackends(KeyBackendsProxy[OCSPKeyBackend]): """An OCSP key backend handler similar to Django's storages or caches handler.""" _setting = "CA_OCSP_KEY_BACKENDS" _type = OCSPKeyBackend key_backends = KeyBackends() ocsp_key_backends = OCSPKeyBackends()