# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.
"""Storages."""
import typing
from collections.abc import Sequence
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from pydantic_core.core_schema import ValidationInfo
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
from cryptography.hazmat.primitives.asymmetric.types import (
CertificateIssuerPrivateKeyTypes,
CertificateIssuerPublicKeyTypes,
)
from cryptography.hazmat.primitives.serialization import (
Encoding,
PrivateFormat,
load_der_private_key,
load_pem_private_key,
)
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import storages
from django_ca import constants
from django_ca.conf import model_settings
from django_ca.key_backends.base import CreatePrivateKeyOptionsBaseModel, KeyBackend
from django_ca.management.actions import PasswordAction
from django_ca.pydantic.type_aliases import Base64EncodedBytes, EllipticCurveTypeAlias
from django_ca.typehints import (
AllowedHashTypes,
ArgumentGroup,
CertificateExtension,
EllipticCurves,
ParsableKeyType,
)
from django_ca.utils import generate_private_key, get_cert_builder
if typing.TYPE_CHECKING:
from django_ca.models import CertificateAuthority
class StoragesCreatePrivateKeyOptions(CreatePrivateKeyOptionsBaseModel):
"""Options for initializing private keys."""
# NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable.
model_config = ConfigDict(arbitrary_types_allowed=True)
password: Optional[bytes]
path: Path
elliptic_curve: Optional[EllipticCurveTypeAlias] = None
@model_validator(mode="after")
def validate_elliptic_curve(self) -> "StoragesCreatePrivateKeyOptions":
"""Validate that the elliptic curve is not set for invalid key types."""
if self.key_type == "EC" and self.elliptic_curve is None:
self.elliptic_curve = model_settings.CA_DEFAULT_ELLIPTIC_CURVE
elif self.key_type != "EC" and self.elliptic_curve is not None:
raise ValueError(f"Elliptic curves are not supported for {self.key_type} keys.")
return self
class StoragesStorePrivateKeyOptions(BaseModel):
"""Options for storing a private key."""
# NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable.
model_config = ConfigDict(frozen=True)
path: Path
password: Optional[bytes]
class StoragesUsePrivateKeyOptions(BaseModel):
"""Options for using a private key."""
# NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable.
model_config = ConfigDict(frozen=True)
password: Optional[Base64EncodedBytes] = Field(default=None, validate_default=True)
@field_validator("password", mode="after")
@classmethod
def load_default_password(cls, password: Optional[bytes], info: ValidationInfo) -> Optional[bytes]:
"""Validator to load the password from CA_PASSWORDS if not given."""
if info.context and password is None:
ca: CertificateAuthority = info.context.get("ca")
if ca is not None: # pragma: no branch # ca is always set, this is just a precaution.
if settings_password := model_settings.CA_PASSWORDS.get(ca.serial):
return settings_password
return password
[docs]
class StoragesBackend(
KeyBackend[StoragesCreatePrivateKeyOptions, StoragesStorePrivateKeyOptions, StoragesUsePrivateKeyOptions]
):
"""The default storage backend that uses Django's file storage API."""
name = "storages"
title = "Store private keys using the Django file storage API"
description = (
"It is most commonly used to store private keys on the filesystem. Custom file storage backends can "
"be used to store keys on other systems (e.g. a cloud storage system)."
)
use_model = StoragesUsePrivateKeyOptions
supported_key_types: tuple[ParsableKeyType, ...] = constants.PARSABLE_KEY_TYPES
supported_elliptic_curves: tuple[EllipticCurves, ...] = tuple(constants.ELLIPTIC_CURVE_TYPES)
# Backend options
storage_alias: str
def __init__(self, alias: str, storage_alias: str) -> None:
if storage_alias not in settings.STORAGES:
raise ValueError(f"{alias}: {storage_alias}: Storage alias is not configured.")
super().__init__(alias, storage_alias=storage_alias)
def __eq__(self, other: Any) -> bool:
return isinstance(other, StoragesBackend) and self.storage_alias == other.storage_alias
def _add_password_argument(self, group: ArgumentGroup) -> None:
group.add_argument(
f"--{self.argparse_prefix}password",
nargs="?",
action=PasswordAction,
metavar="PASSWORD",
prompt="Password for CA: ",
help="Password for the private key of the CA, if stored using the Django storage system.",
)
def _add_path_argument(self, group: ArgumentGroup) -> None:
group.add_argument(
f"--{self.argparse_prefix}path",
type=Path,
default=Path("ca"),
help="Path for storing the private key (in the storage backend, default: %(default)s).",
)
def add_create_private_key_arguments(self, group: ArgumentGroup) -> None:
self._add_path_argument(group)
group.add_argument(
f"--{self.argparse_prefix}password",
nargs="?",
action=PasswordAction,
help="Encrypt the private key with PASSWORD. If PASSWORD is not passed, you will be prompted. By "
"default, the private key is not encrypted.",
)
def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None:
group.add_argument(
f"--{self.argparse_prefix}parent-password",
nargs="?",
action=PasswordAction,
metavar="PASSWORD",
prompt="Password for parent CA: ",
help="Password for the private key of the parent CA, if stored using the Django storage system.",
)
def add_store_private_key_arguments(self, group: ArgumentGroup) -> None:
self._add_password_argument(group)
self._add_path_argument(group)
def add_use_private_key_arguments(self, group: ArgumentGroup) -> None:
self._add_password_argument(group)
def get_create_private_key_options(
self,
key_type: ParsableKeyType,
key_size: Optional[int],
elliptic_curve: Optional[EllipticCurves], # type: ignore[override]
options: dict[str, Any],
) -> StoragesCreatePrivateKeyOptions:
return StoragesCreatePrivateKeyOptions(
key_type=key_type,
password=options[f"{self.options_prefix}password"],
path=options[f"{self.options_prefix}path"],
key_size=key_size,
elliptic_curve=elliptic_curve,
)
def get_store_private_key_options(self, options: dict[str, Any]) -> StoragesStorePrivateKeyOptions:
return StoragesStorePrivateKeyOptions(
password=options[f"{self.options_prefix}password"], path=options[f"{self.options_prefix}path"]
)
def get_use_private_key_options(
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> StoragesUsePrivateKeyOptions:
return StoragesUsePrivateKeyOptions.model_validate(
{"password": options.get(f"{self.options_prefix}password")},
context={"ca": ca, "backend": self},
strict=True,
)
def get_use_parent_private_key_options(
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> StoragesUsePrivateKeyOptions:
return StoragesUsePrivateKeyOptions.model_validate(
{"password": options[f"{self.options_prefix}parent_password"]},
context={"ca": ca, "backend": self},
)
def create_private_key(
self,
ca: "CertificateAuthority",
key_type: ParsableKeyType,
options: StoragesCreatePrivateKeyOptions,
) -> tuple[CertificateIssuerPublicKeyTypes, StoragesUsePrivateKeyOptions]:
storage = storages[self.storage_alias]
if options.password is None:
encryption: serialization.KeySerializationEncryption = serialization.NoEncryption()
else:
encryption = serialization.BestAvailableEncryption(options.password)
key = generate_private_key(options.key_size, key_type, options.elliptic_curve)
der = key.private_bytes(
encoding=Encoding.DER, format=PrivateFormat.PKCS8, encryption_algorithm=encryption
)
# write private key to file and update ourselves so that we are able to sign certificates
safe_serial = ca.serial.replace(":", "")
path = storage.save(str(options.path / f"{safe_serial}.key"), ContentFile(der))
# Update model instance
ca.key_backend_options = {"path": path}
use_private_key_options = StoragesUsePrivateKeyOptions.model_validate(
{"password": options.password}, context={"ca": ca, "backend": self}
)
return key.public_key(), use_private_key_options
def store_private_key(
self,
ca: "CertificateAuthority",
key: CertificateIssuerPrivateKeyTypes,
certificate: x509.Certificate,
options: StoragesStorePrivateKeyOptions,
) -> None:
storage = storages[self.storage_alias]
if options.password is None:
encryption: serialization.KeySerializationEncryption = serialization.NoEncryption()
else:
encryption = serialization.BestAvailableEncryption(options.password)
der = key.private_bytes(
encoding=Encoding.DER, format=PrivateFormat.PKCS8, encryption_algorithm=encryption
)
safe_serial = ca.serial.replace(":", "")
path = storage.save(str(options.path / f"{safe_serial}.key"), ContentFile(der))
# Update model instance
ca.key_backend_options = {"path": path}
def get_key(
self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions
) -> CertificateIssuerPrivateKeyTypes:
"""The CAs private key as private key."""
storage = storages[self.storage_alias]
path = ca.key_backend_options["path"]
# Load encoded private key data from the filesystem
stream = storage.open(path, mode="rb")
try:
key_data: bytes = stream.read()
finally:
stream.close()
password = use_private_key_options.password
try:
key = typing.cast( # type validated below
CertificateIssuerPrivateKeyTypes, load_der_private_key(key_data, password)
)
except ValueError:
try:
key = typing.cast( # type validated below
CertificateIssuerPrivateKeyTypes, load_pem_private_key(key_data, password)
)
except ValueError as ex2:
# cryptography passes the OpenSSL error directly here and it is notoriously unstable.
raise ValueError("Could not decrypt private key - bad password?") from ex2
if not isinstance(key, constants.PRIVATE_KEY_TYPES): # pragma: no cover
raise ValueError("Private key of this type is not supported.")
return key
def is_usable(
self,
ca: "CertificateAuthority",
use_private_key_options: Optional[StoragesUsePrivateKeyOptions] = None,
) -> bool:
# If key_backend_options is not set or path is not set, it is certainly unusable.
if not ca.key_backend_options or not ca.key_backend_options.get("path"):
return False
# If options are not passed, we return True if the file exists.
if not use_private_key_options:
return storages[self.storage_alias].exists(ca.key_backend_options["path"])
try:
self.get_key(ca, use_private_key_options)
return True
except Exception: # pylint: disable=broad-exception-caught # want to always return bool
return False
def check_usable(
self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions
) -> None:
"""Check if the given CA is usable, raise ValueError if not.
The `options` are the options returned by
:py:func:`~django_ca.key_backends.base.KeyBackend.get_use_private_key_options`. It may be ``None`` in
cases where key options cannot (yet) be loaded. If ``None``, the backend should return ``False`` if it
knows for sure that it will not be usable, and ``True`` if usability cannot be determined.
"""
if not ca.key_backend_options or not ca.key_backend_options.get("path"):
raise ValueError(f"{ca.key_backend_options}: Path not configured in database.")
try:
self.get_key(ca, use_private_key_options)
except FileNotFoundError as ex:
storage = storages[self.storage_alias]
try:
path = storage.path(ca.key_backend_options["path"])
except NotImplementedError: # pragma: no cover
# Backends that do not implement path() should raise NotImplementedError
path = ca.key_backend_options["path"]
raise ValueError(f"{path}: Private key file not found.") from ex
except ValueError:
raise
except Exception as ex:
raise ValueError(*ex.args) from ex
def sign_certificate(
self,
ca: "CertificateAuthority",
use_private_key_options: StoragesUsePrivateKeyOptions,
public_key: CertificateIssuerPublicKeyTypes,
serial: int,
algorithm: Optional[AllowedHashTypes],
issuer: x509.Name,
subject: x509.Name,
expires: datetime,
extensions: Sequence[CertificateExtension],
) -> x509.Certificate:
builder = get_cert_builder(expires, serial=serial)
builder = builder.public_key(public_key)
builder = builder.issuer_name(issuer)
builder = builder.subject_name(subject)
for extension in extensions:
builder = builder.add_extension(extension.value, critical=extension.critical)
return builder.sign(private_key=self.get_key(ca, use_private_key_options), algorithm=algorithm)
def sign_certificate_revocation_list(
self,
ca: "CertificateAuthority",
use_private_key_options: StoragesUsePrivateKeyOptions,
builder: x509.CertificateRevocationListBuilder,
algorithm: Optional[AllowedHashTypes],
) -> x509.CertificateRevocationList:
return builder.sign(private_key=self.get_key(ca, use_private_key_options), algorithm=algorithm)
def get_ocsp_key_size(
self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions
) -> int:
"""Get the default key size for OCSP keys. This is only called for RSA or DSA keys."""
key = self.get_key(ca, use_private_key_options)
if not isinstance(key, (rsa.RSAPrivateKey, dsa.DSAPrivateKey)):
raise ValueError("This function should only be called with RSA/DSA CAs.")
return key.key_size
def get_ocsp_key_elliptic_curve(
self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions
) -> ec.EllipticCurve:
"""Get the default elliptic curve for OCSP keys. This is only called for elliptic curve keys."""
key = self.get_key(ca, use_private_key_options)
if not isinstance(key, ec.EllipticCurvePrivateKey):
raise ValueError("This function should only be called with EllipticCurve-based CAs.")
return key.curve