Source code for django_ca.key_backends.storages.backend

# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Key backend using the Django Storages system."""

import base64
import typing
from collections.abc import Sequence
from datetime import datetime
from pathlib import Path
from typing import Any

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
from cryptography.hazmat.primitives.asymmetric.padding import AsymmetricPadding
from cryptography.hazmat.primitives.asymmetric.types import (
    CertificateIssuerPrivateKeyTypes,
    CertificateIssuerPublicKeyTypes,
)
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
from cryptography.hazmat.primitives.serialization import (
    Encoding,
    PrivateFormat,
    load_der_private_key,
    load_pem_private_key,
)

from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import storages

from django_ca import constants
from django_ca.key_backends import KeyBackend
from django_ca.key_backends.storages.models import (
    StoragesCreatePrivateKeyOptions,
    StoragesStorePrivateKeyOptions,
    StoragesUsePrivateKeyOptions,
)
from django_ca.management.actions import PasswordAction
from django_ca.models import CertificateAuthority
from django_ca.typehints import (
    ArgumentGroup,
    CertificateExtension,
    EllipticCurveName,
    ParsableKeyType,
    SignatureHashAlgorithm,
)
from django_ca.utils import generate_private_key, get_cert_builder


[docs] class StoragesBackend( KeyBackend[StoragesCreatePrivateKeyOptions, StoragesStorePrivateKeyOptions, StoragesUsePrivateKeyOptions] ): """The default storage backend that uses Django's file storage API.""" name = "storages" title = "Store private keys using the Django file storage API" description = ( "It is most commonly used to store private keys on the file system. Custom file storage backends can " "be used to store keys on other systems (e.g. a cloud storage system)." ) use_model = StoragesUsePrivateKeyOptions supported_key_types: tuple[ParsableKeyType, ...] = constants.PARSABLE_KEY_TYPES supported_elliptic_curves: tuple[EllipticCurveName, ...] = tuple(constants.ELLIPTIC_CURVE_TYPES) # Backend options storage_alias: str def __init__(self, alias: str, storage_alias: str) -> None: if storage_alias not in settings.STORAGES: raise ValueError(f"{alias}: {storage_alias}: Storage alias is not configured.") super().__init__(alias, storage_alias=storage_alias) def __eq__(self, other: Any) -> bool: return isinstance(other, StoragesBackend) and self.storage_alias == other.storage_alias def __hash__(self) -> int: # pragma: no cover return hash(self.storage_alias) def _add_password_argument(self, group: ArgumentGroup) -> None: group.add_argument( f"--{self.argparse_prefix}password", nargs="?", action=PasswordAction, metavar="PASSWORD", prompt="Password for CA: ", help="Password for the private key of the CA, if stored using the Django storage system.", ) def _add_path_argument(self, group: ArgumentGroup) -> None: group.add_argument( f"--{self.argparse_prefix}path", type=Path, default=Path("ca"), help="Path for storing the private key (in the storage backend, default: %(default)s).", ) def add_create_private_key_arguments(self, group: ArgumentGroup) -> None: self._add_path_argument(group) group.add_argument( f"--{self.argparse_prefix}password", nargs="?", action=PasswordAction, help="Encrypt the private key with PASSWORD. If PASSWORD is not passed, you will be prompted. By " "default, the private key is not encrypted.", ) def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None: group.add_argument( f"--{self.argparse_prefix}parent-password", nargs="?", action=PasswordAction, metavar="PASSWORD", prompt="Password for parent CA: ", help="Password for the private key of the parent CA, if stored using the Django storage system.", ) def add_store_private_key_arguments(self, group: ArgumentGroup) -> None: self._add_password_argument(group) self._add_path_argument(group) def add_use_private_key_arguments(self, group: ArgumentGroup) -> None: self._add_password_argument(group) def get_create_private_key_options( self, key_type: ParsableKeyType, key_size: int | None, elliptic_curve: EllipticCurveName | None, # type: ignore[override] options: dict[str, Any], ) -> StoragesCreatePrivateKeyOptions: return StoragesCreatePrivateKeyOptions( key_type=key_type, password=options[f"{self.options_prefix}password"], path=options[f"{self.options_prefix}path"], key_size=key_size, elliptic_curve=elliptic_curve, ) def get_store_private_key_options(self, options: dict[str, Any]) -> StoragesStorePrivateKeyOptions: return StoragesStorePrivateKeyOptions( password=options[f"{self.options_prefix}password"], path=options[f"{self.options_prefix}path"] ) def get_use_private_key_options( self, ca: "CertificateAuthority", options: dict[str, Any] ) -> StoragesUsePrivateKeyOptions: data = {} if password := options.get(f"{self.options_prefix}password"): data["password"] = password return StoragesUsePrivateKeyOptions.model_validate( data, context={"ca": ca, "backend": self}, strict=True ) def get_use_parent_private_key_options( self, ca: "CertificateAuthority", options: dict[str, Any] ) -> StoragesUsePrivateKeyOptions: return StoragesUsePrivateKeyOptions.model_validate( {"password": options[f"{self.options_prefix}parent_password"]}, context={"ca": ca, "backend": self}, ) def create_private_key( self, ca: "CertificateAuthority", key_type: ParsableKeyType, options: StoragesCreatePrivateKeyOptions, ) -> tuple[CertificateIssuerPublicKeyTypes, StoragesUsePrivateKeyOptions]: storage = storages[self.storage_alias] if options.password is None: encryption: serialization.KeySerializationEncryption = serialization.NoEncryption() else: encryption = serialization.BestAvailableEncryption(options.password) key = generate_private_key(options.key_size, key_type, options.get_elliptic_curve()) der = key.private_bytes( encoding=Encoding.DER, format=PrivateFormat.PKCS8, encryption_algorithm=encryption ) # write private key to file and update ourselves so that we are able to sign certificates safe_serial = ca.serial.replace(":", "") path = storage.save(str(options.path / f"{safe_serial}.key"), ContentFile(der)) # Update model instance ca.key_backend_options = {"path": path} use_private_key_options = StoragesUsePrivateKeyOptions.model_validate( {"password": options.password}, context={"ca": ca, "backend": self} ) return key.public_key(), use_private_key_options def store_private_key( self, ca: "CertificateAuthority", key: CertificateIssuerPrivateKeyTypes, certificate: x509.Certificate, options: StoragesStorePrivateKeyOptions, ) -> None: storage = storages[self.storage_alias] if options.password is None: encryption: serialization.KeySerializationEncryption = serialization.NoEncryption() else: encryption = serialization.BestAvailableEncryption(options.password) der = key.private_bytes( encoding=Encoding.DER, format=PrivateFormat.PKCS8, encryption_algorithm=encryption ) safe_serial = ca.serial.replace(":", "") path = storage.save(str(options.path / f"{safe_serial}.key"), ContentFile(der)) # Update model instance ca.key_backend_options = {"path": path} def get_key( self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions ) -> CertificateIssuerPrivateKeyTypes: """The CAs private key as private key.""" storage = storages[self.storage_alias] path = ca.key_backend_options["path"] # Load encoded private key data from the file system # Load encoded private key data from the file system stream = storage.open(path, mode="rb") try: key_data: bytes = stream.read() finally: stream.close() password = None if use_private_key_options.password is not None: password = base64.b64decode(use_private_key_options.password.encode("ascii")) try: key = typing.cast( # type validated below CertificateIssuerPrivateKeyTypes, load_der_private_key(key_data, password) ) except ValueError: try: key = typing.cast( # type validated below CertificateIssuerPrivateKeyTypes, load_pem_private_key(key_data, password) ) except ValueError as ex2: # cryptography passes the OpenSSL error directly here and it is notoriously unstable. raise ValueError("Could not decrypt private key - bad password?") from ex2 if not isinstance(key, constants.PRIVATE_KEY_TYPES): # pragma: no cover raise ValueError("Private key of this type is not supported.") return key def is_usable( self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions | None = None, ) -> bool: # If key_backend_options is not set or path is not set, it is certainly unusable. if not ca.key_backend_options or not ca.key_backend_options.get("path"): return False # If options are not passed, we return True if the file exists. if not use_private_key_options: return storages[self.storage_alias].exists(ca.key_backend_options["path"]) try: self.get_key(ca, use_private_key_options) return True except Exception: # pylint: disable=broad-exception-caught # want to always return bool return False def check_usable( self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions ) -> None: """Check if the given CA is usable, raise ValueError if not. The `options` are the options returned by :py:func:`~django_ca.key_backends.base.KeyBackend.get_use_private_key_options`. It may be ``None`` in cases where key options cannot (yet) be loaded. If ``None``, the backend should return ``False`` if it knows for sure that it will not be usable, and ``True`` if usability cannot be determined. """ if not ca.key_backend_options or not ca.key_backend_options.get("path"): raise ValueError(f"{ca.key_backend_options}: Path not configured in database.") try: self.get_key(ca, use_private_key_options) except FileNotFoundError as ex: storage = storages[self.storage_alias] try: path = storage.path(ca.key_backend_options["path"]) except NotImplementedError: # pragma: no cover # Backends that do not implement path() should raise NotImplementedError path = ca.key_backend_options["path"] raise ValueError(f"{path}: Private key file not found.") from ex except ValueError: raise except Exception as ex: raise ValueError(*ex.args) from ex def sign_data( self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions, data: bytes, algorithm: hashes.HashAlgorithm | Prehashed | None = None, padding: AsymmetricPadding | None = None, signature_algorithm: ec.EllipticCurveSignatureAlgorithm | None = None, ) -> bytes: private_key = self.get_key(ca, use_private_key_options) kwargs: dict[str, Any] = {} if isinstance(private_key, ec.EllipticCurvePrivateKey): if signature_algorithm is None: raise ValueError("signature_algorithm is required for elliptic curve keys.") kwargs["signature_algorithm"] = signature_algorithm elif isinstance(private_key, rsa.RSAPrivateKey): if algorithm is None: raise ValueError("algorithm is required for RSA keys.") if padding is None: raise ValueError("padding is required for RSA keys.") kwargs["padding"] = padding kwargs["algorithm"] = algorithm elif isinstance(private_key, dsa.DSAPrivateKey): kwargs["algorithm"] = algorithm if algorithm is None: raise ValueError("algorithm is required for DSA keys.") elif algorithm is not None or padding is not None or signature_algorithm is not None: raise ValueError("algorithm, padding and signature_algorithm are not allowed for this key type.") return private_key.sign(data, **kwargs) def sign_certificate( self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions, public_key: CertificateIssuerPublicKeyTypes, serial: int, algorithm: SignatureHashAlgorithm | None, issuer: x509.Name, subject: x509.Name, not_after: datetime, extensions: Sequence[CertificateExtension], ) -> x509.Certificate: builder = get_cert_builder(not_after, serial=serial) builder = builder.public_key(public_key) builder = builder.issuer_name(issuer) builder = builder.subject_name(subject) for extension in extensions: builder = builder.add_extension(extension.value, critical=extension.critical) return builder.sign(private_key=self.get_key(ca, use_private_key_options), algorithm=algorithm) def sign_certificate_revocation_list( self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions, builder: x509.CertificateRevocationListBuilder, algorithm: SignatureHashAlgorithm | None, ) -> x509.CertificateRevocationList: return builder.sign(private_key=self.get_key(ca, use_private_key_options), algorithm=algorithm)