Source code for django_ca.key_backends.hsm.backend

# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Key storage backend for hardware security modules (HSMs)."""

from collections.abc import Sequence
from datetime import datetime
from typing import TYPE_CHECKING, Any, Final

import pkcs11
from pkcs11 import Session
from pkcs11.util.ec import decode_ec_private_key, decode_ec_public_key
from pkcs11.util.rsa import decode_rsa_private_key, decode_rsa_public_key

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, ed448, ed25519, rsa
from cryptography.hazmat.primitives.asymmetric.padding import AsymmetricPadding
from cryptography.hazmat.primitives.asymmetric.types import (
    CertificateIssuerPrivateKeyTypes,
    CertificateIssuerPublicKeyTypes,
)
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed

from django.core.management import CommandError

from django_ca import constants
from django_ca.conf import model_settings
from django_ca.key_backends import KeyBackend
from django_ca.key_backends.hsm.keys import (
    PKCS11Ed448PrivateKey,
    PKCS11Ed25519PrivateKey,
    PKCS11EllipticCurvePrivateKey,
    PKCS11PrivateKeyTypes,
    PKCS11RSAPrivateKey,
)
from django_ca.key_backends.hsm.mixins import HSMKeyBackendMixin
from django_ca.key_backends.hsm.models import (
    HSMCreatePrivateKeyOptions,
    HSMStorePrivateKeyOptions,
    HSMUsePrivateKeyOptions,
)
from django_ca.key_backends.hsm.typehints import SupportedKeyType
from django_ca.key_backends.hsm.utils import decode_eddsa_private_key, decode_eddsa_public_key
from django_ca.typehints import (
    ArgumentGroup,
    CertificateExtension,
    EllipticCurveName,
    ParsableKeyType,
    SignatureHashAlgorithm,
    SignatureHashAlgorithmName,
)
from django_ca.utils import get_cert_builder, int_to_hex

if TYPE_CHECKING:
    from django_ca.models import CertificateAuthority


[docs] class HSMBackend( HSMKeyBackendMixin, KeyBackend[HSMCreatePrivateKeyOptions, HSMStorePrivateKeyOptions, HSMUsePrivateKeyOptions], ): """A key backend to create and use private keys in a hardware security module (HSM).""" name = "hsm" title = "Store private keys using a hardware security module (HSM)" description = ( "The private key will be stored on the hardware security module (HSM). The HSM makes sure that the" "private key can never be recovered and thus compromised." ) use_model = HSMUsePrivateKeyOptions supported_key_types: tuple[SupportedKeyType, ...] = ("RSA", "EC", "Ed25519", "Ed448") supported_hash_algorithms: tuple[SignatureHashAlgorithmName, ...] = ( "SHA-224", "SHA-256", "SHA-384", "SHA-512", ) supported_elliptic_curves: tuple[EllipticCurveName, ...] = tuple(constants.ELLIPTIC_CURVE_TYPES) _required_key_backend_options: Final[tuple[str, str, str]] = ("key_label", "key_id", "key_type") def _add_key_label_argument(self, group: ArgumentGroup, prefix: str = "") -> None: group.add_argument( f"--{self.argparse_prefix}{prefix}key-label", type=str, metavar="LABEL", help="%(metavar)s to use for the private key in the HSM.", ) def _add_pin_arguments(self, group: ArgumentGroup, prefix: str = "") -> None: group.add_argument( f"--{self.argparse_prefix}{prefix}so-pin", type=str, metavar="PIN", help="Security officer %(metavar)s to access the HSM.", ) group.add_argument( f"--{self.argparse_prefix}{prefix}user-pin", type=str, metavar="PIN", help="User %(metavar)s to access the HSM.", ) def _get_pins(self, options: dict[str, Any], prefix: str = "") -> tuple[str | None, str | None]: options_prefix = f"{self.options_prefix}{prefix.replace('-', '_')}" argparse_prefix = f"{self.argparse_prefix}{prefix}" so_pin: str | None = options.get(f"{options_prefix}so_pin") if so_pin is None: so_pin = self.so_pin elif so_pin == "": so_pin = None user_pin: str | None = options.get(f"{options_prefix}user_pin") if user_pin is None: user_pin = self.user_pin elif user_pin == "": user_pin = None if so_pin is not None and user_pin is not None: raise CommandError( "Both SO pin and user pin configured. To override a pin from settings, pass " f'--{argparse_prefix}so-pin="" or --{argparse_prefix}user-pin="".' ) return so_pin, user_pin def _get_private_key(self, ca: "CertificateAuthority", session: Session) -> PKCS11PrivateKeyTypes: key_id: str = ca.key_backend_options["key_id"] key_label: str = ca.key_backend_options["key_label"] key_type: SupportedKeyType = ca.key_backend_options["key_type"] if key_type == "RSA": return PKCS11RSAPrivateKey(session, key_id, key_label) if key_type == "Ed448": return PKCS11Ed448PrivateKey(session, key_id, key_label) if key_type == "Ed25519": return PKCS11Ed25519PrivateKey(session, key_id, key_label) if key_type == "EC": return PKCS11EllipticCurvePrivateKey(session, key_id, key_label) raise ValueError(f"{key_type}: Unsupported key type.") def add_create_private_key_arguments(self, group: ArgumentGroup) -> None: self._add_key_label_argument(group) self._add_pin_arguments(group) def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None: self._add_pin_arguments(group, "parent-") def add_store_private_key_arguments(self, group: ArgumentGroup) -> None: self._add_key_label_argument(group) self._add_pin_arguments(group) def add_use_private_key_arguments(self, group: ArgumentGroup) -> None: self._add_pin_arguments(group) def get_create_private_key_options( self, key_type: ParsableKeyType, key_size: int | None, elliptic_curve: str | None, options: dict[str, Any], ) -> HSMCreatePrivateKeyOptions: key_label = options[f"{self.options_prefix}key_label"] if not key_label: raise CommandError( f"--{self.argparse_prefix}key-label is a required option for this key backend." ) so_pin, user_pin = self._get_pins(options) if key_type == "EC" and elliptic_curve is None: # NOTE: Currently all curves supported by cryptography are also supported by this backend. # If this changes, a check should be added here (if the default is not supported by the # backend). elliptic_curve = model_settings.CA_DEFAULT_ELLIPTIC_CURVE return HSMCreatePrivateKeyOptions( key_label=key_label, key_type=key_type, key_size=key_size, elliptic_curve=elliptic_curve, so_pin=so_pin, user_pin=user_pin, ) def get_use_parent_private_key_options( self, ca: "CertificateAuthority", options: dict[str, Any] ) -> HSMUsePrivateKeyOptions: so_pin, user_pin = self._get_pins(options, "parent-") return HSMUsePrivateKeyOptions.model_validate( {"so_pin": so_pin, "user_pin": user_pin}, context={"ca": ca, "backend": self}, strict=True ) def get_use_private_key_options( self, ca: "CertificateAuthority", options: dict[str, Any] ) -> HSMUsePrivateKeyOptions: data = {} so_pin, user_pin = self._get_pins(options) if so_pin is not None: data["so_pin"] = so_pin if user_pin is not None: data["user_pin"] = user_pin return HSMUsePrivateKeyOptions.model_validate(data, context={"ca": ca, "backend": self}, strict=True) def get_store_private_key_options(self, options: dict[str, Any]) -> HSMStorePrivateKeyOptions: key_label = options[f"{self.options_prefix}key_label"] so_pin, user_pin = self._get_pins(options) return HSMStorePrivateKeyOptions.model_validate( {"key_label": key_label, "so_pin": so_pin, "user_pin": user_pin}, context={"backend": self}, strict=True, ) def is_usable( self, ca: "CertificateAuthority", use_private_key_options: HSMUsePrivateKeyOptions | None = None, ) -> bool: if not ca.key_backend_options or not isinstance(ca.key_backend_options, dict): return False for option in self._required_key_backend_options: if not ca.key_backend_options.get(option): return False if use_private_key_options is None: return True try: with self.session( so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin ) as session: self._get_private_key(ca, session) return True except Exception: # pylint: disable=broad-exception-caught # want to always return bool return False def check_usable( self, ca: "CertificateAuthority", use_private_key_options: HSMUsePrivateKeyOptions ) -> None: if not ca.key_backend_options or not isinstance(ca.key_backend_options, dict): raise ValueError("key backend options are not defined.") for option in self._required_key_backend_options: if not ca.key_backend_options.get(option): raise ValueError(f"{option}: Required key option is not defined.") with self.session( so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin ) as session: self._get_private_key(ca, session) def create_private_key( self, ca: "CertificateAuthority", key_type: SupportedKeyType, # type: ignore[override] # more specific here options: HSMCreatePrivateKeyOptions, ) -> tuple[CertificateIssuerPublicKeyTypes, HSMUsePrivateKeyOptions]: key_id = int_to_hex(x509.random_serial_number()) key_label = options.key_label with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session: private_key = self._create_private_key( session, key_id, key_label, key_type, key_size=options.key_size, elliptic_curve=options.elliptic_curve, ) public_key = private_key.public_key() ca.key_backend_options = {"key_id": key_id, "key_label": key_label, "key_type": key_type} use_private_key_options = HSMUsePrivateKeyOptions.model_validate( {"so_pin": options.so_pin, "user_pin": options.user_pin}, context={"ca": ca, "backend": self} ) return public_key, use_private_key_options def store_private_key( self, ca: "CertificateAuthority", key: CertificateIssuerPrivateKeyTypes, certificate: x509.Certificate, options: HSMStorePrivateKeyOptions, ) -> None: key_id = int_to_hex(x509.random_serial_number()) public_key = certificate.public_key() if isinstance(key, rsa.RSAPrivateKey): key_type: SupportedKeyType = "RSA" key_der = key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) key_attrs = decode_rsa_private_key(key_der) pub_der = public_key.public_bytes( serialization.Encoding.DER, format=serialization.PublicFormat.PKCS1 ) pub_attrs = decode_rsa_public_key(pub_der) elif isinstance(key, ec.EllipticCurvePrivateKey): key_type = "EC" key_der = key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption(), ) key_attrs = decode_ec_private_key(key_der) pub_der = public_key.public_bytes( serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) pub_attrs = decode_ec_public_key(pub_der) elif isinstance(key, ed448.Ed448PrivateKey | ed25519.Ed25519PrivateKey): if isinstance(key, ed448.Ed448PrivateKey): key_type = "Ed448" else: key_type = "Ed25519" key_der = key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) key_attrs = decode_eddsa_private_key(key_der) pub_der = public_key.public_bytes( serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) pub_attrs = decode_eddsa_public_key(pub_der) else: raise ValueError(f"{key}: Importing a key of this type is not supported.") shared_attrs = { pkcs11.Attribute.TOKEN: True, pkcs11.Attribute.PRIVATE: True, pkcs11.Attribute.ID: key_id.encode(), pkcs11.Attribute.LABEL: options.key_label, } key_attrs.update(shared_attrs) pub_attrs.update(shared_attrs) with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session: session.create_object(key_attrs) session.create_object(pub_attrs) ca.key_backend_options = {"key_id": key_id, "key_label": options.key_label, "key_type": key_type} def sign_data( self, ca: "CertificateAuthority", use_private_key_options: HSMUsePrivateKeyOptions, data: bytes, algorithm: hashes.HashAlgorithm | Prehashed | None = None, padding: AsymmetricPadding | None = None, signature_algorithm: ec.EllipticCurveSignatureAlgorithm | None = None, ) -> bytes: kwargs: dict[str, Any] = {} with self.session( so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin ) as session: private_key = self._get_private_key(ca, session) if isinstance(private_key, ec.EllipticCurvePrivateKey): if signature_algorithm is None: raise ValueError("signature_algorithm is required for elliptic curve keys.") kwargs["signature_algorithm"] = signature_algorithm elif isinstance(private_key, rsa.RSAPrivateKey): if algorithm is None: raise ValueError("algorithm is required for RSA keys.") if padding is None: raise ValueError("padding is required for RSA keys.") kwargs["padding"] = padding kwargs["algorithm"] = algorithm elif algorithm is not None or padding is not None or signature_algorithm is not None: raise ValueError( "algorithm, padding and signature_algorithm are not allowed for this key type." ) return private_key.sign(data, **kwargs) def sign_certificate( self, ca: "CertificateAuthority", use_private_key_options: HSMUsePrivateKeyOptions, public_key: CertificateIssuerPublicKeyTypes, serial: int, algorithm: SignatureHashAlgorithm | None, issuer: x509.Name, subject: x509.Name, not_after: datetime, extensions: Sequence[CertificateExtension], ) -> x509.Certificate: builder = get_cert_builder(not_after, serial=serial) builder = builder.public_key(public_key) builder = builder.issuer_name(issuer) builder = builder.subject_name(subject) for extension in extensions: builder = builder.add_extension(extension.value, critical=extension.critical) with self.session( so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin ) as session: private_key = self._get_private_key(ca, session) return builder.sign(private_key=private_key, algorithm=algorithm) def sign_certificate_revocation_list( self, ca: "CertificateAuthority", use_private_key_options: HSMUsePrivateKeyOptions, builder: x509.CertificateRevocationListBuilder, algorithm: SignatureHashAlgorithm | None, ) -> x509.CertificateRevocationList: with self.session( so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin ) as session: private_key = self._get_private_key(ca, session) return builder.sign(private_key=private_key, algorithm=algorithm)