# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not,
# see <http://www.gnu.org/licenses/>.
"""Module for handling certificate profiles."""
import typing
import warnings
from datetime import datetime, timedelta
from threading import local
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from cryptography import x509
from cryptography.hazmat.primitives.hashes import HashAlgorithm
from cryptography.x509.oid import AuthorityInformationAccessOID, ExtensionOID
from django_ca import ca_settings
from django_ca.constants import EXTENSION_DEFAULT_CRITICAL, EXTENSION_KEY_OIDS, EXTENSION_KEYS
from django_ca.deprecation import RemovedInDjangoCA124Warning, deprecate_type
from django_ca.extensions import (
KEY_TO_EXTENSION,
OID_TO_EXTENSION,
Extension,
parse_extension,
serialize_extension,
)
from django_ca.signals import pre_issue_cert, pre_sign_cert
from django_ca.subject import Subject
from django_ca.typehints import (
Expires,
ExtensionMapping,
ParsableExtension,
ParsableHash,
SerializedExtension,
SerializedProfile,
)
from django_ca.utils import (
get_cert_builder,
parse_expires,
parse_general_name,
parse_hash_algorithm,
serialize_name,
x509_name,
)
if typing.TYPE_CHECKING:
from django_ca.models import CertificateAuthority
[docs]class Profile:
"""A certificate profile defining properties and extensions of a certificate.
.. deprecated:: 1.21.0
Passing a ``dict`` or ``django_ca.subject.Subject`` as ``subject`` is deprecated. The feature will be
removed in ``django_ca==1.24.0`` instead. Pass a :py:class:`~cg:cryptography.x509.Name` or an iterable
of tuples instead.
.. deprecated:: 1.22.0
Passing a ``django_ca.extension.Extension`` is deprecated. The feature will be removed in
``django_ca==1.24.0``. Pass a :py:class:`~cg:cryptography.x509.Extension` a serialized version instead.
Instances of this class usually represent profiles defined in :ref:`CA_PROFILES <settings-ca-profiles>`,
but you can also create your own profile to create a different type of certificate. An instance of this
class can be used to create a signed certificate based on the given CA::
>>> Profile('example', subject='/C=AT', extensions={'ocsp_no_check': {}})
<Profile: example>
"""
# pylint: disable=too-many-instance-attributes
algorithm: Optional[HashAlgorithm] = None
extensions: Dict[x509.ObjectIdentifier, Optional[x509.Extension[x509.ExtensionType]]]
@deprecate_type("subject", (dict, Subject), RemovedInDjangoCA124Warning)
def __init__(
self,
name: str,
subject: Optional[Union[x509.Name, Iterable[Tuple[str, str]]]] = None,
algorithm: ParsableHash = None,
extensions: Optional[
Dict[str, Optional[Union[ParsableExtension, x509.Extension[x509.ExtensionType]]]]
] = None,
cn_in_san: bool = True,
expires: Optional[Union[int, timedelta]] = None,
description: str = "",
autogenerated: bool = False,
add_crl_url: bool = True,
add_ocsp_url: bool = True,
add_issuer_url: bool = True,
add_issuer_alternative_name: bool = True,
) -> None:
# pylint: disable=too-many-arguments
self.name = name
if isinstance(expires, int):
expires = timedelta(days=expires)
if extensions is None:
extensions = {}
if subject is None:
subject = ca_settings.CA_DEFAULT_SUBJECT
if isinstance(subject, x509.Name):
self.subject = subject
elif isinstance(subject, Subject): # pragma: django-ca<=1.24
self.subject = subject.name
else:
self.subject = x509_name(subject)
if algorithm is not None:
self.algorithm = parse_hash_algorithm(algorithm)
self.cn_in_san = cn_in_san
self.expires = expires or ca_settings.CA_DEFAULT_EXPIRES
self.add_crl_url = add_crl_url
self.add_issuer_url = add_issuer_url
self.add_ocsp_url = add_ocsp_url
self.add_issuer_alternative_name = add_issuer_alternative_name
self.description = description
self.autogenerated = autogenerated
# Instantiating mutable dict here and not as class attribute to improve isolation
self.extensions: Dict[str, Optional[x509.Extension[x509.ExtensionType]]] = {}
# cast extensions to their respective classes
for key, extension in extensions.items():
if isinstance(extension, x509.Extension):
self.extensions[extension.oid] = extension
elif isinstance(extension, Extension):
warnings.warn(
f"Passing {extension.__class__.__name__} is deprecated.",
RemovedInDjangoCA124Warning,
stacklevel=2,
)
self.extensions[extension.oid] = extension.as_extension()
elif extension is None:
# None value explicitly deactivates/unsets an extension in the admin interface
self.extensions[EXTENSION_KEY_OIDS[key]] = None
else:
parsed_extension = parse_extension(key, extension)
self.extensions[parsed_extension.oid] = parsed_extension
# set some sane extension defaults
self.extensions.setdefault(
ExtensionOID.BASIC_CONSTRAINTS,
x509.Extension(
oid=ExtensionOID.BASIC_CONSTRAINTS,
critical=EXTENSION_DEFAULT_CRITICAL[ExtensionOID.BASIC_CONSTRAINTS],
value=x509.BasicConstraints(ca=False, path_length=None),
),
)
def __eq__(self, value: object) -> bool:
if not isinstance(value, (Profile, DefaultProfileProxy)):
return False
algo = isinstance(value.algorithm, type(self.algorithm))
return (
self.name == value.name
and self.subject == value.subject
and algo
and self.extensions == value.extensions
and self.cn_in_san == value.cn_in_san
and self.expires == value.expires
and self.add_crl_url == value.add_crl_url
and self.add_issuer_url == value.add_issuer_url
and self.add_ocsp_url == value.add_ocsp_url
and self.add_issuer_alternative_name == value.add_issuer_alternative_name
and self.description == value.description
)
def __repr__(self) -> str:
return f"<Profile: {self.name}>"
def __str__(self) -> str:
return repr(self)
def _get_extensions(
self, extensions: Optional[Iterable[x509.Extension[x509.ExtensionType]]]
) -> ExtensionMapping:
extensions_msg = "Passing a django_ca.extensions.Extension instance deprecated."
if extensions is None:
# NOTE: Remove Optional from dict values once support for dicts is removed
extensions_update: Dict[x509.ObjectIdentifier, Optional[x509.Extension[x509.ExtensionType]]] = {}
elif isinstance(extensions, dict):
warnings.warn(
"Passing a dict for extensions is deprecated.", RemovedInDjangoCA124Warning, stacklevel=2
)
extensions_update = {}
# Convert deprecated django_ca.extensions.Extension class
for key, ext in extensions.items():
if isinstance(ext, Extension):
warnings.warn(extensions_msg, RemovedInDjangoCA124Warning, stacklevel=2)
converted_extension = ext.as_extension()
expected = KEY_TO_EXTENSION[key]
if expected.oid != converted_extension.oid:
raise ValueError(f"extensions[{key}] is not of type {expected.__name__}")
extensions_update[converted_extension.oid] = converted_extension
elif isinstance(ext, x509.Extension):
if EXTENSION_KEY_OIDS[key] != ext.oid:
raise ValueError(f"extensions[{key}] is not of expected type")
extensions_update[ext.oid] = ext
elif ext is None:
extensions_update[EXTENSION_KEY_OIDS[key]] = None
else:
raise ValueError(f"{ext}: Must be a cryptography.x509.Extension instance or None")
else: # should be a list
extensions_update = {}
# Convert deprecated django_ca.extensions.Extension class
for ext_item in extensions:
if isinstance(ext_item, x509.Extension):
extensions_update[ext_item.oid] = ext_item
else:
warnings.warn(extensions_msg, RemovedInDjangoCA124Warning, stacklevel=2)
extensions_update[ext_item.oid] = ext_item.as_extension()
cert_extensions = self.extensions.copy()
cert_extensions.update(extensions_update)
# NOTE: this line is no longer necessary once support for passing dicts is dropped, as values can no
# longer be None.
filtered_cert_extensions = {k: v for k, v in cert_extensions.items() if v is not None}
return filtered_cert_extensions
[docs] @deprecate_type("subject", (dict, str, Subject), RemovedInDjangoCA124Warning)
@deprecate_type("extensions", dict, RemovedInDjangoCA124Warning)
def create_cert( # pylint: disable=too-many-arguments,too-many-locals
self,
ca: "CertificateAuthority",
csr: x509.CertificateSigningRequest,
subject: Optional[x509.Name] = None,
expires: Expires = None,
algorithm: Optional[HashAlgorithm] = None,
extensions: Optional[Iterable[x509.Extension[x509.ExtensionType]]] = None,
cn_in_san: Optional[bool] = None,
add_crl_url: Optional[bool] = None,
add_ocsp_url: Optional[bool] = None,
add_issuer_url: Optional[bool] = None,
add_issuer_alternative_name: Optional[bool] = None,
password: Optional[Union[str, bytes]] = None,
) -> x509.Certificate:
"""Create a x509 certificate based on this profile, the passed CA and input parameters.
.. deprecated:: 1.22.0
* Passing a ``~django_ca.subject.Subject`` as `subject` is now deprecated. The feature will be
removed in ``django_ca==1.24``. Pass a :py:class:`~cg:cryptography.x509.Name` instance instead.
* Passing a ``dict`` for `extensions` is now deprecated, pass a list instead.
* Passing ``django_ca.extensions.Extension`` in `extensions is now deprecated, pass regular
:py:class:`~cg:cryptography.x509.Extension` objects instead.
This function is the core function used to create x509 certificates. In it's simplest form, you only
need to pass a ca, a CSR and a subject to get a valid certificate::
>>> profile = get_profile('webserver')
>>> profile.create_cert(ca, csr, subject=x509_name('/CN=example.com')) # doctest: +ELLIPSIS
<Certificate(subject=<Name(...,CN=example.com)>, ...)>
The function will add CRL, OCSP, Issuer and IssuerAlternativeName URLs based on the CA if the profile
has the *add_crl_url*, *add_ocsp_url* and *add_issuer_url* and *add_issuer_alternative_name* values
set. Parameters to this function with the same name allow you override this behavior.
The function allows you to override profile values using the *expires* and *algorithm* values. You can
pass additional *extensions* as a list, which will override any extensions from the profile, but the
CA passed will append to these extensions unless the *add_...* values are ``False``.
Parameters
----------
ca : :py:class:`~django_ca.models.CertificateAuthority`
The CA to sign the certificate with.
csr : :py:class:`~cg:cryptography.x509.CertificateSigningRequest`
The CSR for the certificate.
subject : :py:class:`~cg:cryptography.x509.Name`, optional
Subject for the certificate. The value will be merged with the subject of the profile. If not
given, the certificate's subject will be identical to the subject from the profile.
expires : int or datetime or timedelta, optional
Override when this certificate will expire.
algorithm : :py:class:`~cg:cryptography.hazmat.primitives.hashes.HashAlgorithm`, optional
Override the hash algorithm used when signing the certificate.
extensions : list or of :py:class:`~cg:cryptography.x509.Extension`
List of additional extensions to set for the certificate. Note that values from the CA might
update the passed extensions: For example, if you pass an
:py:class:`~cg:cryptography.x509.IssuerAlternativeName` extension, *add_issuer_alternative_name*
is ``True`` and the passed CA has an IssuerAlternativeName set, that value will be appended to the
extension you pass here.
cn_in_san : bool, optional
Override if the CommonName should be added as an SubjectAlternativeName. If not passed, the value
set in the profile is used.
add_crl_url : bool, optional
Override if any CRL URLs from the CA should be added to the CA. If not passed, the value set in
the profile is used.
add_ocsp_url : bool, optional
Override if any OCSP URLs from the CA should be added to the CA. If not passed, the value set in
the profile is used.
add_issuer_url : bool, optional
Override if any Issuer URLs from the CA should be added to the CA. If not passed, the value set in
the profile is used.
add_issuer_alternative_name : bool, optional
Override if any IssuerAlternativeNames from the CA should be added to the CA. If not passed, the
value set in the profile is used.
password: bytes or str, optional
The password to the private key of the CA.
Returns
-------
cryptography.x509.Certificate
The signed certificate.
"""
# Get overrides values from profile if not passed as parameter
if cn_in_san is None:
cn_in_san = self.cn_in_san
if add_crl_url is None:
add_crl_url = self.add_crl_url
if add_ocsp_url is None:
add_ocsp_url = self.add_ocsp_url
if add_issuer_url is None:
add_issuer_url = self.add_issuer_url
if add_issuer_alternative_name is None:
add_issuer_alternative_name = self.add_issuer_alternative_name
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="django_ca.subject.Subject will be removed in 1.24.0.",
category=RemovedInDjangoCA124Warning,
)
cert_subject = Subject(self.subject)
cert_extensions = self._get_extensions(extensions)
self._update_from_ca(
ca,
cert_extensions,
add_crl_url=add_crl_url,
add_ocsp_url=add_ocsp_url,
add_issuer_url=add_issuer_url,
add_issuer_alternative_name=add_issuer_alternative_name,
)
if not isinstance(subject, Subject):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="django_ca.subject.Subject will be removed in 1.24.0.",
category=RemovedInDjangoCA124Warning,
)
converted_subject = Subject(subject) # NOTE: also accepts None
else: # pragma: only django_ca<=1.24
converted_subject = subject
cert_subject.update(converted_subject)
if algorithm is None and ca.algorithm:
if self.algorithm is not None:
algorithm = self.algorithm
else:
algorithm = ca.algorithm
# Make sure that expires is a fixed timestamp
expires = self.get_expires(expires)
# Finally, update SAN with the current CN, if set and requested
self._update_san_from_cn(cn_in_san, subject=cert_subject, extensions=cert_extensions)
if not converted_subject.get("CN") and not cert_extensions.get(ExtensionOID.SUBJECT_ALTERNATIVE_NAME):
raise ValueError("Must name at least a CN or a subjectAlternativeName.")
# Convert extensions to legacy classes so that we can send the deprecated signal
with warnings.catch_warnings(): # disable warnings as constructors raise a warning
warnings.simplefilter("ignore")
cert_extensions_old = [OID_TO_EXTENSION[ext.oid](ext) for ext in cert_extensions.values()]
pre_issue_cert.send(
sender=self.__class__,
ca=ca,
csr=csr,
expires=expires,
algorithm=algorithm,
subject=cert_subject,
extensions=cert_extensions_old,
password=password,
)
pre_sign_cert.send(
sender=self.__class__,
ca=ca,
csr=csr,
expires=expires,
algorithm=algorithm,
subject=cert_subject.name,
extensions=extensions,
password=password,
)
public_key = csr.public_key()
builder = get_cert_builder(expires)
builder = builder.public_key(public_key)
builder = builder.issuer_name(ca.subject)
builder = builder.subject_name(cert_subject.name)
for _key, extension in cert_extensions.items():
builder = builder.add_extension(extension.value, critical=extension.critical)
# Add the SubjectKeyIdentifier
if ExtensionOID.SUBJECT_KEY_IDENTIFIER not in cert_extensions:
builder = builder.add_extension(
x509.SubjectKeyIdentifier.from_public_key(public_key), critical=False
)
return builder.sign(private_key=ca.key(password), algorithm=algorithm)
[docs] def get_expires(self, expires: Expires) -> datetime:
"""Get expiry for the given expiry timestamp."""
if expires is None:
expires = self.expires
return parse_expires(expires)
[docs] def serialize(self) -> SerializedProfile:
"""Function to serialize a profile.
This is function is called by the admin interface to retrieve profile information to the browser, so
the value returned by this function should always be JSON serializable.
"""
extensions: Dict[str, Optional[SerializedExtension]] = {}
for key, extension in self.extensions.items():
if extension is None:
# None value explicitly deactivates/unsets an extension in the admin interface
extensions[EXTENSION_KEYS[key]] = None
else:
extensions[EXTENSION_KEYS[key]] = serialize_extension(extension)
data: SerializedProfile = {
"cn_in_san": self.cn_in_san,
"description": self.description,
"subject": serialize_name(self.subject),
"extensions": extensions,
}
return data
def _update_authority_information_access(
self,
extensions: ExtensionMapping,
ca_extensions: ExtensionMapping,
add_issuer_url: bool,
add_ocsp_url: bool,
) -> None:
oid = ExtensionOID.AUTHORITY_INFORMATION_ACCESS
# If there is no extension from the CA there is nothing to merge.
if oid not in ca_extensions:
return
ca_aia_ext = typing.cast(x509.Extension[x509.AuthorityInformationAccess], ca_extensions[oid])
critical = ca_aia_ext.critical
access_descriptions: List[x509.AccessDescription] = []
if add_issuer_url is True:
access_descriptions += [
ad
for ad in ca_aia_ext.value
if ad not in access_descriptions
and ad.access_method == AuthorityInformationAccessOID.CA_ISSUERS
]
if add_ocsp_url is True:
access_descriptions += [
ad
for ad in ca_aia_ext.value
if ad not in access_descriptions and ad.access_method == AuthorityInformationAccessOID.OCSP
]
# Add ADs from certificate
if oid in extensions:
cert_aia_ext = typing.cast(x509.Extension[x509.AuthorityInformationAccess], extensions[oid])
access_descriptions += list(cert_aia_ext.value)
critical = cert_aia_ext.critical
# Finally sort by OID so that we have more predictable behavior
access_descriptions = sorted(access_descriptions, key=lambda ad: ad.access_method.dotted_string)
if access_descriptions:
extensions[oid] = x509.Extension(
oid=oid,
critical=critical,
value=x509.AuthorityInformationAccess(access_descriptions),
)
def _update_crl_distribution_points(
self, extensions: ExtensionMapping, ca_extensions: ExtensionMapping
) -> None:
"""Update the CRLDistributionPoints extension with the endpoint from the Certificate Authority."""
oid = ExtensionOID.CRL_DISTRIBUTION_POINTS
if oid not in ca_extensions:
return
ca_crldp_ext = typing.cast(x509.Extension[x509.CRLDistributionPoints], ca_extensions[oid])
if oid in extensions:
ca_crldp = ca_crldp_ext.value
ca_name = ca_crldp[0].full_name[0]
cert_crldp = typing.cast(x509.CRLDistributionPoints, extensions[oid].value)
for dpoint in cert_crldp:
if dpoint.full_name and ca_name in dpoint.full_name:
break
else: # loop exits normally, so break not reached --> dpoint not in existing extension
ext_value = x509.CRLDistributionPoints(list(ca_crldp) + list(cert_crldp))
extensions[oid] = x509.Extension(oid=oid, critical=extensions[oid].critical, value=ext_value)
else:
extensions[oid] = ca_crldp_ext
def _update_issuer_alternative_name(
self, extensions: ExtensionMapping, ca_extensions: ExtensionMapping
) -> None:
oid = ExtensionOID.ISSUER_ALTERNATIVE_NAME
if oid not in ca_extensions:
return
if oid in extensions:
ca_ian = typing.cast(x509.IssuerAlternativeName, ca_extensions[oid].value)
cert_ian = typing.cast(x509.IssuerAlternativeName, extensions[oid].value)
ext_value = x509.IssuerAlternativeName(list(ca_ian) + list(cert_ian))
extensions[oid] = x509.Extension(oid=oid, critical=extensions[oid].critical, value=ext_value)
else:
extensions[oid] = ca_extensions[oid]
def _update_from_ca(
self,
ca: "CertificateAuthority",
extensions: ExtensionMapping,
add_crl_url: bool,
add_ocsp_url: bool,
add_issuer_url: bool,
add_issuer_alternative_name: bool,
) -> None:
"""Update data from the given CA.
* Sets the AuthorityKeyIdentifier extension
* Sets the OCSP url if add_ocsp_url is True
* Sets a CRL URL if add_crl_url is True
* Adds an IssuerAlternativeName if add_issuer_alternative_name is True
"""
ca_extensions = ca.extensions_for_certificate
# client can set its own AuthorityKeyIdentifier extension (currently used for the alt-extensions
# certificate when creating fixtures).
extensions.setdefault(
ExtensionOID.AUTHORITY_KEY_IDENTIFIER, ca.get_authority_key_identifier_extension()
)
if add_crl_url is not False:
self._update_crl_distribution_points(extensions, ca_extensions)
self._update_authority_information_access(
extensions, ca_extensions, add_issuer_url=add_issuer_url, add_ocsp_url=add_ocsp_url
)
if add_issuer_alternative_name is not False:
self._update_issuer_alternative_name(extensions, ca_extensions)
def _update_san_from_cn(self, cn_in_san: bool, subject: Subject, extensions: ExtensionMapping) -> None:
if subject.get("CN") and cn_in_san is True:
try:
common_name = parse_general_name(typing.cast(str, subject["CN"]))
except ValueError as e:
raise ValueError(
f"{subject['CN']}: Could not parse CommonName as subjectAlternativeName."
) from e
if ExtensionOID.SUBJECT_ALTERNATIVE_NAME in extensions:
san_ext = typing.cast(
x509.Extension[x509.SubjectAlternativeName],
extensions[ExtensionOID.SUBJECT_ALTERNATIVE_NAME],
)
if common_name not in san_ext.value:
extensions[ExtensionOID.SUBJECT_ALTERNATIVE_NAME] = x509.Extension(
oid=ExtensionOID.SUBJECT_ALTERNATIVE_NAME,
critical=san_ext.critical,
value=x509.SubjectAlternativeName(list(san_ext.value) + [common_name]),
)
else:
extensions[ExtensionOID.SUBJECT_ALTERNATIVE_NAME] = x509.Extension(
oid=ExtensionOID.SUBJECT_ALTERNATIVE_NAME,
critical=False,
value=x509.SubjectAlternativeName([common_name]),
)
elif not subject.get("CN") and ExtensionOID.SUBJECT_ALTERNATIVE_NAME in extensions:
san_ext = typing.cast(
x509.Extension[x509.SubjectAlternativeName], extensions[ExtensionOID.SUBJECT_ALTERNATIVE_NAME]
)
cn_from_san = next(
(
str(val.value)
for val in san_ext.value
if isinstance(val, (x509.DNSName, x509.UniformResourceIdentifier, x509.IPAddress))
),
None,
)
if cn_from_san is not None:
subject["CN"] = cn_from_san
def get_profile(name: Optional[str] = None) -> Profile:
"""Get profile by the given name.
Raises ``KeyError`` if the profile is not defined.
Parameters
----------
name : str, optional
The name of the profile. If ``None``, the profile configured by
:ref:`CA_DEFAULT_PROFILE <settings-ca-default-profile>` is used.
"""
if name is None:
name = ca_settings.CA_DEFAULT_PROFILE
return Profile(name, **ca_settings.CA_PROFILES[name])
class Profiles:
"""A profile handler similar to Djangos CacheHandler."""
def __init__(self) -> None:
self._profiles = local()
def __getitem__(self, name: Optional[str]) -> Profile:
if name is None:
name = ca_settings.CA_DEFAULT_PROFILE
try:
return typing.cast(Profile, self._profiles.profiles[name])
except AttributeError:
self._profiles.profiles = {}
except KeyError:
pass
self._profiles.profiles[name] = get_profile(name)
return typing.cast(Profile, self._profiles.profiles[name])
def _reset(self) -> None:
self._profiles = local()
profiles = Profiles()
class DefaultProfileProxy:
"""Default profile proxy, similar to Djangos DefaultCacheProxy.
.. NOTE:: We don't implement setattr/delattr, because Profiles are supposed to be read-only anyway.
"""
def __getattr__(self, name: str) -> Any:
return getattr(profiles[ca_settings.CA_DEFAULT_PROFILE], name)
def __eq__(self, other: object) -> bool:
if not isinstance(other, (DefaultProfileProxy, Profile)):
return False
return profiles[ca_settings.CA_DEFAULT_PROFILE] == other
def __repr__(self) -> str:
return f"<DefaultProfile: {self.name}>"
def __str__(self) -> str:
return repr(self)
profile = DefaultProfileProxy()