# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.
"""Key backend using the Django Storages system."""
import base64
import typing
from collections.abc import Sequence
from datetime import datetime
from pathlib import Path
from typing import Any
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
from cryptography.hazmat.primitives.asymmetric.padding import AsymmetricPadding
from cryptography.hazmat.primitives.asymmetric.types import (
CertificateIssuerPrivateKeyTypes,
CertificateIssuerPublicKeyTypes,
)
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed
from cryptography.hazmat.primitives.serialization import (
Encoding,
PrivateFormat,
load_der_private_key,
load_pem_private_key,
)
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import storages
from django_ca import constants
from django_ca.key_backends import KeyBackend
from django_ca.key_backends.storages.models import (
StoragesCreatePrivateKeyOptions,
StoragesStorePrivateKeyOptions,
StoragesUsePrivateKeyOptions,
)
from django_ca.management.actions import PasswordAction
from django_ca.models import CertificateAuthority
from django_ca.typehints import (
ArgumentGroup,
CertificateExtension,
EllipticCurveName,
ParsableKeyType,
SignatureHashAlgorithm,
)
from django_ca.utils import generate_private_key, get_cert_builder
[docs]
class StoragesBackend(
KeyBackend[StoragesCreatePrivateKeyOptions, StoragesStorePrivateKeyOptions, StoragesUsePrivateKeyOptions]
):
"""The default storage backend that uses Django's file storage API."""
name = "storages"
title = "Store private keys using the Django file storage API"
description = (
"It is most commonly used to store private keys on the file system. Custom file storage backends can "
"be used to store keys on other systems (e.g. a cloud storage system)."
)
use_model = StoragesUsePrivateKeyOptions
supported_key_types: tuple[ParsableKeyType, ...] = constants.PARSABLE_KEY_TYPES
supported_elliptic_curves: tuple[EllipticCurveName, ...] = tuple(constants.ELLIPTIC_CURVE_TYPES)
# Backend options
storage_alias: str
def __init__(self, alias: str, storage_alias: str) -> None:
if storage_alias not in settings.STORAGES:
raise ValueError(f"{alias}: {storage_alias}: Storage alias is not configured.")
super().__init__(alias, storage_alias=storage_alias)
def __eq__(self, other: Any) -> bool:
return isinstance(other, StoragesBackend) and self.storage_alias == other.storage_alias
def __hash__(self) -> int: # pragma: no cover
return hash(self.storage_alias)
def _add_password_argument(self, group: ArgumentGroup) -> None:
group.add_argument(
f"--{self.argparse_prefix}password",
nargs="?",
action=PasswordAction,
metavar="PASSWORD",
prompt="Password for CA: ",
help="Password for the private key of the CA, if stored using the Django storage system.",
)
def _add_path_argument(self, group: ArgumentGroup) -> None:
group.add_argument(
f"--{self.argparse_prefix}path",
type=Path,
default=Path("ca"),
help="Path for storing the private key (in the storage backend, default: %(default)s).",
)
def add_create_private_key_arguments(self, group: ArgumentGroup) -> None:
self._add_path_argument(group)
group.add_argument(
f"--{self.argparse_prefix}password",
nargs="?",
action=PasswordAction,
help="Encrypt the private key with PASSWORD. If PASSWORD is not passed, you will be prompted. By "
"default, the private key is not encrypted.",
)
def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None:
group.add_argument(
f"--{self.argparse_prefix}parent-password",
nargs="?",
action=PasswordAction,
metavar="PASSWORD",
prompt="Password for parent CA: ",
help="Password for the private key of the parent CA, if stored using the Django storage system.",
)
def add_store_private_key_arguments(self, group: ArgumentGroup) -> None:
self._add_password_argument(group)
self._add_path_argument(group)
def add_use_private_key_arguments(self, group: ArgumentGroup) -> None:
self._add_password_argument(group)
def get_create_private_key_options(
self,
key_type: ParsableKeyType,
key_size: int | None,
elliptic_curve: EllipticCurveName | None, # type: ignore[override]
options: dict[str, Any],
) -> StoragesCreatePrivateKeyOptions:
return StoragesCreatePrivateKeyOptions(
key_type=key_type,
password=options[f"{self.options_prefix}password"],
path=options[f"{self.options_prefix}path"],
key_size=key_size,
elliptic_curve=elliptic_curve,
)
def get_store_private_key_options(self, options: dict[str, Any]) -> StoragesStorePrivateKeyOptions:
return StoragesStorePrivateKeyOptions(
password=options[f"{self.options_prefix}password"], path=options[f"{self.options_prefix}path"]
)
def get_use_private_key_options(
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> StoragesUsePrivateKeyOptions:
data = {}
if password := options.get(f"{self.options_prefix}password"):
data["password"] = password
return StoragesUsePrivateKeyOptions.model_validate(
data, context={"ca": ca, "backend": self}, strict=True
)
def get_use_parent_private_key_options(
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> StoragesUsePrivateKeyOptions:
return StoragesUsePrivateKeyOptions.model_validate(
{"password": options[f"{self.options_prefix}parent_password"]},
context={"ca": ca, "backend": self},
)
def create_private_key(
self,
ca: "CertificateAuthority",
key_type: ParsableKeyType,
options: StoragesCreatePrivateKeyOptions,
) -> tuple[CertificateIssuerPublicKeyTypes, StoragesUsePrivateKeyOptions]:
storage = storages[self.storage_alias]
if options.password is None:
encryption: serialization.KeySerializationEncryption = serialization.NoEncryption()
else:
encryption = serialization.BestAvailableEncryption(options.password)
key = generate_private_key(options.key_size, key_type, options.get_elliptic_curve())
der = key.private_bytes(
encoding=Encoding.DER, format=PrivateFormat.PKCS8, encryption_algorithm=encryption
)
# write private key to file and update ourselves so that we are able to sign certificates
safe_serial = ca.serial.replace(":", "")
path = storage.save(str(options.path / f"{safe_serial}.key"), ContentFile(der))
# Update model instance
ca.key_backend_options = {"path": path}
use_private_key_options = StoragesUsePrivateKeyOptions.model_validate(
{"password": options.password}, context={"ca": ca, "backend": self}
)
return key.public_key(), use_private_key_options
def store_private_key(
self,
ca: "CertificateAuthority",
key: CertificateIssuerPrivateKeyTypes,
certificate: x509.Certificate,
options: StoragesStorePrivateKeyOptions,
) -> None:
storage = storages[self.storage_alias]
if options.password is None:
encryption: serialization.KeySerializationEncryption = serialization.NoEncryption()
else:
encryption = serialization.BestAvailableEncryption(options.password)
der = key.private_bytes(
encoding=Encoding.DER, format=PrivateFormat.PKCS8, encryption_algorithm=encryption
)
safe_serial = ca.serial.replace(":", "")
path = storage.save(str(options.path / f"{safe_serial}.key"), ContentFile(der))
# Update model instance
ca.key_backend_options = {"path": path}
def get_key(
self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions
) -> CertificateIssuerPrivateKeyTypes:
"""The CAs private key as private key."""
storage = storages[self.storage_alias]
path = ca.key_backend_options["path"]
# Load encoded private key data from the file system
# Load encoded private key data from the file system
stream = storage.open(path, mode="rb")
try:
key_data: bytes = stream.read()
finally:
stream.close()
password = None
if use_private_key_options.password is not None:
password = base64.b64decode(use_private_key_options.password.encode("ascii"))
try:
key = typing.cast( # type validated below
CertificateIssuerPrivateKeyTypes, load_der_private_key(key_data, password)
)
except ValueError:
try:
key = typing.cast( # type validated below
CertificateIssuerPrivateKeyTypes, load_pem_private_key(key_data, password)
)
except ValueError as ex2:
# cryptography passes the OpenSSL error directly here and it is notoriously unstable.
raise ValueError("Could not decrypt private key - bad password?") from ex2
if not isinstance(key, constants.PRIVATE_KEY_TYPES): # pragma: no cover
raise ValueError("Private key of this type is not supported.")
return key
def is_usable(
self,
ca: "CertificateAuthority",
use_private_key_options: StoragesUsePrivateKeyOptions | None = None,
) -> bool:
# If key_backend_options is not set or path is not set, it is certainly unusable.
if not ca.key_backend_options or not ca.key_backend_options.get("path"):
return False
# If options are not passed, we return True if the file exists.
if not use_private_key_options:
return storages[self.storage_alias].exists(ca.key_backend_options["path"])
try:
self.get_key(ca, use_private_key_options)
return True
except Exception: # pylint: disable=broad-exception-caught # want to always return bool
return False
def check_usable(
self, ca: "CertificateAuthority", use_private_key_options: StoragesUsePrivateKeyOptions
) -> None:
"""Check if the given CA is usable, raise ValueError if not.
The `options` are the options returned by
:py:func:`~django_ca.key_backends.base.KeyBackend.get_use_private_key_options`. It may be ``None`` in
cases where key options cannot (yet) be loaded. If ``None``, the backend should return ``False`` if it
knows for sure that it will not be usable, and ``True`` if usability cannot be determined.
"""
if not ca.key_backend_options or not ca.key_backend_options.get("path"):
raise ValueError(f"{ca.key_backend_options}: Path not configured in database.")
try:
self.get_key(ca, use_private_key_options)
except FileNotFoundError as ex:
storage = storages[self.storage_alias]
try:
path = storage.path(ca.key_backend_options["path"])
except NotImplementedError: # pragma: no cover
# Backends that do not implement path() should raise NotImplementedError
path = ca.key_backend_options["path"]
raise ValueError(f"{path}: Private key file not found.") from ex
except ValueError:
raise
except Exception as ex:
raise ValueError(*ex.args) from ex
def sign_data(
self,
ca: "CertificateAuthority",
use_private_key_options: StoragesUsePrivateKeyOptions,
data: bytes,
algorithm: hashes.HashAlgorithm | Prehashed | None = None,
padding: AsymmetricPadding | None = None,
signature_algorithm: ec.EllipticCurveSignatureAlgorithm | None = None,
) -> bytes:
private_key = self.get_key(ca, use_private_key_options)
kwargs: dict[str, Any] = {}
if isinstance(private_key, ec.EllipticCurvePrivateKey):
if signature_algorithm is None:
raise ValueError("signature_algorithm is required for elliptic curve keys.")
kwargs["signature_algorithm"] = signature_algorithm
elif isinstance(private_key, rsa.RSAPrivateKey):
if algorithm is None:
raise ValueError("algorithm is required for RSA keys.")
if padding is None:
raise ValueError("padding is required for RSA keys.")
kwargs["padding"] = padding
kwargs["algorithm"] = algorithm
elif isinstance(private_key, dsa.DSAPrivateKey):
kwargs["algorithm"] = algorithm
if algorithm is None:
raise ValueError("algorithm is required for DSA keys.")
elif algorithm is not None or padding is not None or signature_algorithm is not None:
raise ValueError("algorithm, padding and signature_algorithm are not allowed for this key type.")
return private_key.sign(data, **kwargs)
def sign_certificate(
self,
ca: "CertificateAuthority",
use_private_key_options: StoragesUsePrivateKeyOptions,
public_key: CertificateIssuerPublicKeyTypes,
serial: int,
algorithm: SignatureHashAlgorithm | None,
issuer: x509.Name,
subject: x509.Name,
not_after: datetime,
extensions: Sequence[CertificateExtension],
) -> x509.Certificate:
builder = get_cert_builder(not_after, serial=serial)
builder = builder.public_key(public_key)
builder = builder.issuer_name(issuer)
builder = builder.subject_name(subject)
for extension in extensions:
builder = builder.add_extension(extension.value, critical=extension.critical)
return builder.sign(private_key=self.get_key(ca, use_private_key_options), algorithm=algorithm)
def sign_certificate_revocation_list(
self,
ca: "CertificateAuthority",
use_private_key_options: StoragesUsePrivateKeyOptions,
builder: x509.CertificateRevocationListBuilder,
algorithm: SignatureHashAlgorithm | None,
) -> x509.CertificateRevocationList:
return builder.sign(private_key=self.get_key(ca, use_private_key_options), algorithm=algorithm)