Source code for django_ca.models

# -*- coding: utf-8 -*-
#
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca.  If not,
# see <http://www.gnu.org/licenses/>.

import base64
import binascii
import hashlib
import logging
import os
import re

import pytz

from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PublicFormat
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509.oid import ExtensionOID

from django.conf import settings
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.encoding import force_str
from django.utils.text import slugify
from django.utils.translation import ugettext_lazy as _

from . import ca_settings
from .extensions import AuthorityInformationAccess
from .extensions import AuthorityKeyIdentifier
from .extensions import BasicConstraints
from .extensions import ExtendedKeyUsage
from .extensions import IssuerAlternativeName
from .extensions import KeyUsage
from .extensions import NameConstraints
from .extensions import OCSPNoCheck
from .extensions import PrecertificateSignedCertificateTimestamps
from .extensions import SubjectAlternativeName
from .extensions import SubjectKeyIdentifier
from .extensions import TLSFeature
from .extensions import UnrecognizedExtension
from .managers import CertificateAuthorityManager
from .managers import CertificateManager
from .querysets import CertificateAuthorityQuerySet
from .querysets import CertificateQuerySet
from .signals import post_revoke_cert
from .signals import pre_revoke_cert
from .subject import Subject
from .utils import add_colons
from .utils import ca_storage
from .utils import format_general_names
from .utils import format_name
from .utils import get_extension_name
from .utils import int_to_hex
from .utils import multiline_url_validator
from .utils import read_file

log = logging.getLogger(__name__)

if ca_settings.CRYPTOGRAPHY_HAS_PRECERT_POISON:  # pragma: no branch, pragma: only cryptography>=2.4
    from .extensions import PrecertPoison


[docs]class Watcher(models.Model): name = models.CharField(max_length=64, null=True, blank=True, verbose_name=_('CommonName')) mail = models.EmailField(verbose_name=_('E-Mail'), unique=True) @classmethod def from_addr(cls, addr): name = None match = re.match(r'(.*?)\s*<(.*)>', addr) if match is not None: name, addr = match.groups() try: w = cls.objects.get(mail=addr) if w.name != name: w.name = name w.save() except cls.DoesNotExist: w = cls(mail=addr, name=name) w.full_clean() w.save() return w def __str__(self): if self.name: return '%s <%s>' % (self.name, self.mail) return self.mail
[docs]class X509CertMixin(models.Model): # reasons are defined in http://www.ietf.org/rfc/rfc3280.txt REVOCATION_REASONS = ( ('', _('No reason')), ('aa_compromise', _('Attribute Authority compromised')), ('affiliation_changed', _('Affiliation changed')), ('ca_compromise', _('CA compromised')), ('certificate_hold', _('On Hold')), ('cessation_of_operation', _('Cessation of operation')), ('key_compromise', _('Key compromised')), ('privilege_withdrawn', _('Privilege withdrawn')), ('remove_from_crl', _('Removed from CRL')), ('superseded', _('Superseded')), ('unspecified', _('Unspecified')), ) created = models.DateTimeField(auto_now=True) valid_from = models.DateTimeField(blank=False) expires = models.DateTimeField(null=False, blank=False) pub = models.TextField(verbose_name=_('Public key')) cn = models.CharField(max_length=128, verbose_name=_('CommonName')) serial = models.CharField(max_length=64, unique=True) # revocation information revoked = models.BooleanField(default=False) revoked_date = models.DateTimeField(null=True, blank=True, verbose_name=_('Revoked on')) revoked_reason = models.CharField( max_length=32, null=True, blank=True, verbose_name=_('Reason for revokation'), choices=REVOCATION_REASONS) _x509 = None class Meta: abstract = True
[docs] def get_revocation_reason(self): # pragma: only cryptography>=2.4 """Get the revocation reason of this certificate. Note that this method is only used by cryptography>=2.4. """ if self.revoked is False: return if self.revoked_reason == '' or self.revoked_reason is None: return x509.ReasonFlags.unspecified else: return getattr(x509.ReasonFlags, self.revoked_reason)
[docs] def get_revocation_time(self): """Get the revocation time as naive datetime. Note that this method is only used by cryptography>=2.4. """ if self.revoked is False: return if timezone.is_aware(self.revoked_date): # convert datetime object to UTC and make it naive return timezone.make_naive(self.revoked_date, pytz.utc) return self.revoked_date
@property def x509(self): """The underlying :py:class:`cg:cryptography.x509.Certificate`.""" if self._x509 is None: backend = default_backend() self._x509 = x509.load_pem_x509_certificate(force_bytes(self.pub), backend) return self._x509 @x509.setter def x509(self, value): self._x509 = value self.pub = force_str(self.dump_certificate(Encoding.PEM)) self.cn = self.subject.get('CN', '') self.expires = self.not_after self.valid_from = self.not_before if settings.USE_TZ: self.expires = timezone.make_aware(self.expires, timezone=pytz.utc) self.valid_from = timezone.make_aware(self.valid_from, timezone=pytz.utc) self.serial = int_to_hex(value.serial_number) @property def admin_change_url(self): return reverse('admin:%s_%s_change' % (self._meta.app_label, self._meta.verbose_name), args=(self.pk, )) ########################## # Certificate properties # ########################## @property def algorithm(self): return self.x509.signature_hash_algorithm def dump_certificate(self, encoding=Encoding.PEM): return self.x509.public_bytes(encoding=encoding) def get_digest(self, algo): algo = getattr(hashes, algo.upper())() return add_colons(binascii.hexlify(self.x509.fingerprint(algo)).upper().decode('utf-8')) def get_filename(self, ext, bundle=False): slug = slugify(self.cn.replace('.', '_')) if bundle is True: return '%s_bundle.%s' % (slug, ext.lower()) else: return '%s.%s' % (slug, ext.lower()) def get_revocation(self): if self.revoked is False: raise ValueError('Certificate is not revoked.') revoked_cert = x509.RevokedCertificateBuilder().serial_number( self.x509.serial_number).revocation_date(self.revoked_date) if self.revoked_reason: reason_flag = getattr(x509.ReasonFlags, self.revoked_reason) revoked_cert = revoked_cert.add_extension(x509.CRLReason(reason_flag), critical=False) return revoked_cert.build(default_backend()) @property def hpkp_pin(self): # taken from https://github.com/luisgf/hpkp-python/blob/master/hpkp.py public_key_raw = self.x509.public_key().public_bytes( encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo) public_key_hash = hashlib.sha256(public_key_raw).digest() return base64.b64encode(public_key_hash).decode('utf-8') @property def issuer(self): """The certificate issuer field as :py:class:`~django_ca.subject.Subject`.""" return Subject([(s.oid, s.value) for s in self.x509.issuer]) @property def not_before(self): """Date/Time this certificate was created""" return self.x509.not_valid_before @property def not_after(self): """Date/Time this certificate expires.""" return self.x509.not_valid_after @property def ocsp_status(self): # NOTE: The OCSP status 'good' does not say if the certificate has expired. if self.revoked is False: return 'good' return self.revoked_reason or 'revoked' def revoke(self, reason=None): pre_revoke_cert.send(sender=self.__class__, cert=self, reason=reason) self.revoked = True self.revoked_date = timezone.now() self.revoked_reason = reason self.save() post_revoke_cert.send(sender=self.__class__, cert=self) @property def subject(self): """The certificates subject as :py:class:`~django_ca.subject.Subject`.""" return Subject([(s.oid, s.value) for s in self.x509.subject]) def distinguishedName(self): return format_name(self.x509.subject) distinguishedName.short_description = 'Distinguished Name' ################### # X509 extensions # ################### OID_MAPPING = { ExtensionOID.AUTHORITY_INFORMATION_ACCESS: 'authority_information_access', ExtensionOID.AUTHORITY_KEY_IDENTIFIER: 'authority_key_identifier', ExtensionOID.BASIC_CONSTRAINTS: 'basic_constraints', ExtensionOID.EXTENDED_KEY_USAGE: 'extended_key_usage', ExtensionOID.ISSUER_ALTERNATIVE_NAME: 'issuer_alternative_name', ExtensionOID.KEY_USAGE: 'key_usage', ExtensionOID.NAME_CONSTRAINTS: 'name_constraints', ExtensionOID.OCSP_NO_CHECK: 'ocsp_no_check', ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS: 'precertificate_signed_certificate_timestamps', ExtensionOID.SUBJECT_ALTERNATIVE_NAME: 'subject_alternative_name', ExtensionOID.SUBJECT_KEY_IDENTIFIER: 'subject_key_identifier', ExtensionOID.TLS_FEATURE: 'tls_feature', } if ca_settings.CRYPTOGRAPHY_HAS_PRECERT_POISON: # pragma: no branch, pragma: only cryptography>=2.4 OID_MAPPING[ExtensionOID.PRECERT_POISON] = 'precert_poison' @property def _sorted_extensions(self): return sorted(self.x509.extensions, key=lambda e: (get_extension_name(e), e.oid.dotted_string)) def get_extension_fields(self): for ext in self._sorted_extensions: if ext.oid in self.OID_MAPPING: yield self.OID_MAPPING[ext.oid] # extension that does not support new extension framework else: name = ext.oid._name.replace(' ', '') if hasattr(self, name): yield name elif name == 'cRLDistributionPoints': yield 'cRLDistributionPoints' else: log.warning('Unknown extension encountered: %s (%s)', get_extension_name(ext), ext.oid.dotted_string) yield ext def get_extensions(self): for ext in self._sorted_extensions: if ext.oid in self.OID_MAPPING: yield getattr(self, self.OID_MAPPING[ext.oid]) # extension that does not support new extension framework else: name = ext.oid._name.replace(' ', '') if hasattr(self, name): value = getattr(self, name)() yield name, value elif name == 'cRLDistributionPoints': yield name, self.crlDistributionPoints() else: # pragma: no cover - we have a function for everything we support yield name, (ext.critical, ext.oid) @property def authority_information_access(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_INFORMATION_ACCESS) except x509.ExtensionNotFound: return None return AuthorityInformationAccess(ext) @property def authority_key_identifier(self): """The :py:class:`~django_ca.extensions.AuthorityKeyIdentifier` extension, or ``None`` if it doesn't exist.""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_KEY_IDENTIFIER) except x509.ExtensionNotFound: return None return AuthorityKeyIdentifier(ext) @property def basic_constraints(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS) except x509.ExtensionNotFound: return None return BasicConstraints(ext) @property def issuer_alternative_name(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.ISSUER_ALTERNATIVE_NAME) except x509.ExtensionNotFound: return None return IssuerAlternativeName(ext) @property def key_usage(self): """The :py:class:`~django_ca.extensions.KeyUsage` extension, or ``None`` if it doesn't exist.""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) except x509.ExtensionNotFound: return None return KeyUsage(ext) @property def extended_key_usage(self): """The :py:class:`~django_ca.extensions.ExtendedKeyUsage` extension, or ``None`` if it doesn't exist.""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE) except x509.ExtensionNotFound: return None return ExtendedKeyUsage(ext) @property def name_constraints(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.NAME_CONSTRAINTS) except x509.ExtensionNotFound: return None return NameConstraints(ext) @property def ocsp_no_check(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.OCSP_NO_CHECK) except x509.ExtensionNotFound: return None return OCSPNoCheck(ext) @property def precert_poison(self): # pragma: only cryptography>=2.4 try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.PRECERT_POISON) except x509.ExtensionNotFound: return None return PrecertPoison(ext) @property def precertificate_signed_certificate_timestamps(self): try: ext = self.x509.extensions.get_extension_for_oid( ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS) except x509.ExtensionNotFound: return None if isinstance(ext.value, x509.UnrecognizedExtension): # Older versions of OpenSSL (and LibreSSL) cannot parse this extension # see https://github.com/pyca/cryptography/blob/master/tests/x509/test_x509_ext.py#L4455-L4459 return UnrecognizedExtension( ext, name=get_extension_name(ext), error='Requires OpenSSL 1.1.0f or later') else: # pragma: only SCT return PrecertificateSignedCertificateTimestamps(ext) @property def subject_alternative_name(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) except x509.ExtensionNotFound: return None return SubjectAlternativeName(ext) @property def subject_key_identifier(self): """The :py:class:`~django_ca.extensions.SubjectKeyIdentifier` extension, or ``None`` if it doesn't exist.""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_KEY_IDENTIFIER) except x509.ExtensionNotFound: return None return SubjectKeyIdentifier(ext) @property def tls_feature(self): """The :py:class:`~django_ca.extensions.TLSFeature` extension, or ``None`` if it doesn't exist.""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.TLS_FEATURE) except x509.ExtensionNotFound: return None return TLSFeature(ext) ################################# # Old-style extension accessors # ################################# def certificatePolicies(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.CERTIFICATE_POLICIES) except x509.ExtensionNotFound: return None policies = [] for value in ext.value: output = 'OID %s: ' % value.policy_identifier.dotted_string if value.policy_qualifiers is None: output += "None" else: output += ', '.join([self._parse_policy_qualifier(p) for p in value.policy_qualifiers]) policies.append(output) return ext.critical, policies def crlDistributionPoints(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.CRL_DISTRIBUTION_POINTS) except x509.ExtensionNotFound: return None value = [] for dp in ext.value: if dp.full_name: value.append('Full Name: %s' % format_general_names(dp.full_name)) else: # pragma: no cover - not really used in the wild value.append('Relative Name: %s' % format_name(dp.relative_name.value)) return ext.critical, value def _parse_policy_qualifier(self, qualifier): if isinstance(qualifier, x509.extensions.UserNotice): # https://tools.ietf.org/html/rfc5280#section-4.2.1.4 notice_ref = qualifier.notice_reference text = qualifier.explicit_text if notice_ref is None: return text else: # pragma: no cover - unseen in the wild org = notice_ref.organization numbers = notice_ref.notice_numbers if not numbers: return '%s (Reference: %s)' % (text, org) elif len(numbers) == 1: return '%s (Reference: %s, number %s)' % (text, org, numbers[0]) else: return '%s (Reference: %s, numbers %s)' % (text, org, ', '.join(numbers)) return qualifier
[docs]class CertificateAuthority(X509CertMixin): objects = CertificateAuthorityManager.from_queryset(CertificateAuthorityQuerySet)() name = models.CharField(max_length=32, help_text=_('A human-readable name'), unique=True) """Human-readable name of the CA, only used for displaying the CA.""" enabled = models.BooleanField(default=True) parent = models.ForeignKey('self', on_delete=models.SET_NULL, null=True, blank=True, related_name='children') private_key_path = models.CharField(max_length=256, help_text=_('Path to the private key.')) # various details used when signing certs crl_url = models.TextField(blank=True, null=True, validators=[multiline_url_validator], verbose_name=_('CRL URLs'), help_text=_("URLs, one per line, where you can retrieve the CRL.")) issuer_url = models.URLField(blank=True, null=True, verbose_name=_('Issuer URL'), help_text=_("URL to the certificate of this CA (in DER format).")) ocsp_url = models.URLField(blank=True, null=True, verbose_name=_('OCSP responder URL'), help_text=_("URL of a OCSP responser for the CA.")) issuer_alt_name = models.CharField(blank=True, max_length=255, default='', verbose_name=_('issuerAltName'), help_text=_("URL for your CA.")) _key = None def key(self, password): if self._key is None: if os.path.isabs(self.private_key_path): log.warning('%s: CA uses absolute path. Use "manage.py migrate_ca" to update.', self.serial) key_data = read_file(self.private_key_path) self._key = load_pem_private_key(key_data, password, default_backend()) return self._key @property def key_exists(self): if self._key is not None: return True elif os.path.isabs(self.private_key_path): log.warning('%s: CA uses absolute path. Use "manage.py migrate_ca" to update.', self.serial) return os.path.exists(self.private_key_path) else: return ca_storage.exists(self.private_key_path)
[docs] def get_authority_key_identifier(self): """Return the AuthorityKeyIdentifier extension used in certificates signed by this CA.""" try: ski = self.x509.extensions.get_extension_for_class(x509.SubjectKeyIdentifier) except x509.ExtensionNotFound: return x509.AuthorityKeyIdentifier.from_issuer_public_key(self.x509.public_key()) else: return x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(ski)
@property def pathlen(self): """The ``pathlen`` attribute of the ``BasicConstraints`` extension (either an ``int`` or ``None``).""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS) except x509.ExtensionNotFound: # pragma: no cover - extension should always be present return None return ext.value.path_length @property def max_pathlen(self): """The maximum pathlen for any intermediate CAs signed by this CA. This value is either ``None``, if this and all parent CAs don't have a ``pathlen`` attribute, or an ``int`` if any parent CA has the attribute. """ pathlen = self.pathlen if self.parent is None: return pathlen max_parent = self.parent.max_pathlen if max_parent is None: return pathlen elif pathlen is None: return max_parent - 1 else: return min(self.pathlen, max_parent - 1) @property def allows_intermediate_ca(self): """Wether this CA allows creating intermediate CAs.""" max_pathlen = self.max_pathlen return max_pathlen is None or max_pathlen > 0 @property def bundle(self): """A list of any parent CAs, including this CA. The list is ordered so the Root CA will be the first. """ ca = self bundle = [ca] while ca.parent is not None: bundle.append(ca.parent) ca = ca.parent return bundle class Meta: verbose_name = _('Certificate Authority') verbose_name_plural = _('Certificate Authorities') def __str__(self): return self.name
[docs]class Certificate(X509CertMixin): objects = CertificateManager.from_queryset(CertificateQuerySet)() watchers = models.ManyToManyField(Watcher, related_name='certificates', blank=True) ca = models.ForeignKey(CertificateAuthority, on_delete=models.CASCADE, verbose_name=_('Certificate Authority')) csr = models.TextField(verbose_name=_('CSR'), blank=True) @property def bundle(self): """The complete certificate bundle. This includes all CAs as well as the certificates itself.""" return [self] + self.ca.bundle def __str__(self): return self.cn