Source code for django_ca.models

# -*- coding: utf-8 -*-
#
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca.  If not,
# see <http://www.gnu.org/licenses/>.

import base64
import binascii
import hashlib
import re
from datetime import datetime
from datetime import timedelta

import pytz

from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PublicFormat
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509.certificate_transparency import LogEntryType
from cryptography.x509.extensions import UnrecognizedExtension
from cryptography.x509.oid import AuthorityInformationAccessOID
from cryptography.x509.oid import ExtensionOID

from django.conf import settings
from django.db import models
from django.urls import reverse
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.encoding import force_str
from django.utils.translation import ugettext_lazy as _

from . import ca_settings
from .extensions import AuthorityKeyIdentifier
from .extensions import ExtendedKeyUsage
from .extensions import KeyUsage
from .extensions import SubjectKeyIdentifier
from .extensions import TLSFeature
from .managers import CertificateAuthorityManager
from .managers import CertificateManager
from .querysets import CertificateAuthorityQuerySet
from .querysets import CertificateQuerySet
from .signals import post_revoke_cert
from .signals import pre_revoke_cert
from .subject import Subject
from .utils import add_colons
from .utils import format_general_name
from .utils import format_general_names
from .utils import format_name
from .utils import int_to_hex
from .utils import multiline_url_validator


class Watcher(models.Model):
    name = models.CharField(max_length=64, null=True, blank=True, verbose_name=_('CommonName'))
    mail = models.EmailField(verbose_name=_('E-Mail'), unique=True)

    @classmethod
    def from_addr(cls, addr):
        name = None
        match = re.match(r'(.*?)\s*<(.*)>', addr)
        if match is not None:
            name, addr = match.groups()

        try:
            w = cls.objects.get(mail=addr)
            if w.name != name:
                w.name = name
                w.save()
        except cls.DoesNotExist:
            w = cls(mail=addr, name=name)
            w.full_clean()
            w.save()

        return w

    def __str__(self):
        if self.name:
            return '%s <%s>' % (self.name, self.mail)
        return self.mail


[docs]class X509CertMixin(models.Model): # reasons are defined in http://www.ietf.org/rfc/rfc3280.txt REVOCATION_REASONS = ( ('', _('No reason')), ('aa_compromise', _('Attribute Authority compromised')), ('affiliation_changed', _('Affiliation changed')), ('ca_compromise', _('CA compromised')), ('certificate_hold', _('On Hold')), ('cessation_of_operation', _('Cessation of operation')), ('key_compromise', _('Key compromised')), ('privilege_withdrawn', _('Privilege withdrawn')), ('remove_from_crl', _('Removed from CRL')), ('superseded', _('Superseded')), ('unspecified', _('Unspecified')), ) created = models.DateTimeField(auto_now=True) expires = models.DateTimeField(null=False, blank=False) pub = models.TextField(verbose_name=_('Public key')) cn = models.CharField(max_length=128, verbose_name=_('CommonName')) serial = models.CharField(max_length=64, unique=True) # revocation information revoked = models.BooleanField(default=False) revoked_date = models.DateTimeField(null=True, blank=True, verbose_name=_('Revoked on')) revoked_reason = models.CharField( max_length=32, null=True, blank=True, verbose_name=_('Reason for revokation'), choices=REVOCATION_REASONS) _x509 = None class Meta: abstract = True @property def x509(self): """The underlying :py:class:`cryptography:cryptography.x509.Certificate`.""" if self._x509 is None: backend = default_backend() self._x509 = x509.load_pem_x509_certificate(force_bytes(self.pub), backend) return self._x509 @x509.setter def x509(self, value): self._x509 = value self.pub = force_str(self.dump_certificate(Encoding.PEM)) self.cn = self.subject.get('CN', '') self.expires = self.not_after if settings.USE_TZ: self.expires = timezone.make_aware(self.expires, timezone=pytz.utc) self.serial = int_to_hex(value.serial_number) @property def admin_change_url(self): return reverse('admin:%s_%s_change' % (self._meta.app_label, self._meta.verbose_name), args=(self.pk, )) ########################## # Certificate properties # ########################## @property def algorithm(self): return self.x509.signature_hash_algorithm def dump_certificate(self, encoding=Encoding.PEM): return self.x509.public_bytes(encoding=encoding) def get_digest(self, algo): algo = getattr(hashes, algo.upper())() return add_colons(binascii.hexlify(self.x509.fingerprint(algo)).upper().decode('utf-8')) def get_revocation(self): if self.revoked is False: raise ValueError('Certificate is not revoked.') revoked_cert = x509.RevokedCertificateBuilder().serial_number( self.x509.serial_number).revocation_date(self.revoked_date) if self.revoked_reason: reason_flag = getattr(x509.ReasonFlags, self.revoked_reason) revoked_cert = revoked_cert.add_extension(x509.CRLReason(reason_flag), critical=False) return revoked_cert.build(default_backend()) @property def hpkp_pin(self): # taken from https://github.com/luisgf/hpkp-python/blob/master/hpkp.py public_key_raw = self.x509.public_key().public_bytes( encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo) public_key_hash = hashlib.sha256(public_key_raw).digest() return base64.b64encode(public_key_hash).decode('utf-8') @property def issuer(self): """The certificate issuer field as :py:class:`~django_ca.subject.Subject`.""" return Subject([(s.oid, s.value) for s in self.x509.issuer]) @property def not_before(self): """Date/Time this certificate was created""" return self.x509.not_valid_before @property def not_after(self): """Date/Time this certificate expires.""" return self.x509.not_valid_after @property def ocsp_status(self): # NOTE: The OCSP status 'good' does not say if the certificate has expired. if self.revoked is False: return 'good' return self.revoked_reason or 'revoked' def revoke(self, reason=None): pre_revoke_cert.send(sender=self.__class__, cert=self, reason=reason) self.revoked = True self.revoked_date = timezone.now() self.revoked_reason = reason self.save() post_revoke_cert.send(sender=self.__class__, cert=self) @property def subject(self): """The certificates subject as :py:class:`~django_ca.subject.Subject`.""" return Subject([(s.oid, s.value) for s in self.x509.subject]) ################### # X509 extensions # ################### OID_MAPPING = { ExtensionOID.AUTHORITY_KEY_IDENTIFIER: 'authority_key_identifier', ExtensionOID.EXTENDED_KEY_USAGE: 'extended_key_usage', ExtensionOID.KEY_USAGE: 'key_usage', ExtensionOID.SUBJECT_KEY_IDENTIFIER: 'subject_key_identifier', ExtensionOID.TLS_FEATURE: 'tls_feature', } def get_extension_fields(self): for ext in sorted(self.x509.extensions, key=lambda e: e.oid._name.title()): if ext.oid in self.OID_MAPPING: yield self.OID_MAPPING[ext.oid] # extension that does not support new extension framework else: name = ext.oid._name.replace(' ', '') if hasattr(self, name): yield name elif name == 'cRLDistributionPoints': yield 'cRLDistributionPoints' else: # pragma: no cover - we have a function for everything we support #warnings.warn('Unknown extension encountered: %s' % ext.oid._name) yield ext def get_extensions(self): for ext in sorted(self.x509.extensions, key=lambda e: e.oid._name): if ext.oid in self.OID_MAPPING: yield getattr(self, self.OID_MAPPING[ext.oid]) # extension that does not support new extension framework else: name = ext.oid._name.replace(' ', '') if hasattr(self, name): value = getattr(self, name)() yield name, value elif name == 'cRLDistributionPoints': yield name, self.crlDistributionPoints() else: # pragma: no cover - we have a function for everything we support yield name, (ext.critical, ext.oid) @property def authority_key_identifier(self): """The :py:class:`~django_ca.extensions.AuthorityKeyIdentifier` extension, or ``None`` if it doesn't exist.""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_KEY_IDENTIFIER) except x509.ExtensionNotFound: return None return AuthorityKeyIdentifier(ext) @property def key_usage(self): """The :py:class:`~django_ca.extensions.KeyUsage` extension, or ``None`` if it doesn't exist.""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) except x509.ExtensionNotFound: return None return KeyUsage(ext) @property def extended_key_usage(self): """The :py:class:`~django_ca.extensions.ExtendedKeyUsage` extension, or ``None`` if it doesn't exist.""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE) except x509.ExtensionNotFound: return None return ExtendedKeyUsage(ext) @property def subject_key_identifier(self): """The :py:class:`~django_ca.extensions.SubjectKeyIdentifier` extension, or ``None`` if it doesn't exist.""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_KEY_IDENTIFIER) except x509.ExtensionNotFound: return None return SubjectKeyIdentifier(ext) @property def tls_feature(self): """The :py:class:`~django_ca.extensions.TLSFeature` extension, or ``None`` if it doesn't exist.""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.TLS_FEATURE) except x509.ExtensionNotFound: return None return TLSFeature(ext) ################################# # Old-style extension accessors # ################################# def authorityInfoAccess(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_INFORMATION_ACCESS) except x509.ExtensionNotFound: return None output = [] for desc in ext.value: if desc.access_method == AuthorityInformationAccessOID.OCSP: output.append('OCSP - %s' % format_general_name(desc.access_location)) elif desc.access_method == AuthorityInformationAccessOID.CA_ISSUERS: output.append('CA Issuers - %s' % format_general_name(desc.access_location)) else: # pragma: no cover - we don't know any other access methods output.append('Unknown') return ext.critical, output def basicConstraints(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS) except x509.ExtensionNotFound: return None if ext.value.ca is True: value = 'CA:TRUE' else: value = 'CA:FALSE' if ext.value.path_length is not None: value = '%s, pathlen:%s' % (value, ext.value.path_length) return ext.critical, value def certificatePolicies(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.CERTIFICATE_POLICIES) except x509.ExtensionNotFound: return None policies = [] for value in ext.value: output = 'OID %s: ' % value.policy_identifier.dotted_string if value.policy_qualifiers is None: output += "None" else: output += ', '.join([self._parse_policy_qualifier(p) for p in value.policy_qualifiers]) policies.append(output) return ext.critical, policies def crlDistributionPoints(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.CRL_DISTRIBUTION_POINTS) except x509.ExtensionNotFound: return None value = [] for dp in ext.value: if dp.full_name: value.append('Full Name: %s' % format_general_names(dp.full_name)) else: # pragma: no cover - not really used in the wild value.append('Relative Name: %s' % format_name(dp.relative_name.value)) return ext.critical, value def distinguishedName(self): return format_name(self.x509.subject) distinguishedName.short_description = 'Distinguished Name' def issuerAltName(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.ISSUER_ALTERNATIVE_NAME) except x509.ExtensionNotFound: return None return ext.critical, format_general_names(ext.value) def signedCertificateTimestampList(self): try: ext = self.x509.extensions.get_extension_for_oid( ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS) except x509.ExtensionNotFound: return None if isinstance(ext.value, UnrecognizedExtension): # Older versions of OpenSSL (and LibreSSL) cannot parse this extension # see https://github.com/pyca/cryptography/blob/master/tests/x509/test_x509_ext.py#L4455-L4459 return ext.critical, ['Parsing requires OpenSSL 1.1.0f+'] timestamps = [] for entry in ext.value: if entry.entry_type == LogEntryType.PRE_CERTIFICATE: entry_type = 'Precertificate' elif entry.entry_type == LogEntryType.X509_CERTIFICATE: # pragma: no cover - unseen in the wild # NOTE: same pragma is also in django_ca.admin.CertificateMixin.signedCertificateTimestampList entry_type = 'x509 certificate' else: # pragma: no cover - only the above two are part of the standard # NOTE: same pragma is also in django_ca.admin.CertificateMixin.signedCertificateTimestampList entry_type = 'unknown' timestamps.append('%s (%s): %s\n%s' % ( entry_type, entry.version.name, entry.timestamp, '\n%s' % binascii.hexlify(entry.log_id).decode('utf-8') )) return ext.critical, timestamps def subjectAltName(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) except x509.ExtensionNotFound: return None return ext.critical, [format_general_name(name) for name in ext.value] def _parse_policy_qualifier(self, qualifier): if isinstance(qualifier, x509.extensions.UserNotice): # https://tools.ietf.org/html/rfc5280#section-4.2.1.4 notice_ref = qualifier.notice_reference text = qualifier.explicit_text if notice_ref is None: return text else: # pragma: no cover - unseen in the wild org = notice_ref.organization numbers = notice_ref.notice_numbers if not numbers: return '%s (Reference: %s)' % (text, org) elif len(numbers) == 1: return '%s (Reference: %s, number %s)' % (text, org, numbers[0]) else: return '%s (Reference: %s, numbers %s)' % (text, org, ', '.join(numbers)) return qualifier
[docs]class CertificateAuthority(X509CertMixin): objects = CertificateAuthorityManager.from_queryset(CertificateAuthorityQuerySet)() name = models.CharField(max_length=32, help_text=_('A human-readable name'), unique=True) """Human-readable name of the CA, only used for displaying the CA.""" enabled = models.BooleanField(default=True) parent = models.ForeignKey('self', on_delete=models.SET_NULL, null=True, blank=True, related_name='children') private_key_path = models.CharField(max_length=256, help_text=_('Path to the private key.')) # various details used when signing certs crl_url = models.TextField(blank=True, null=True, validators=[multiline_url_validator], verbose_name=_('CRL URLs'), help_text=_("URLs, one per line, where you can retrieve the CRL.")) issuer_url = models.URLField(blank=True, null=True, verbose_name=_('Issuer URL'), help_text=_("URL to the certificate of this CA (in DER format).")) ocsp_url = models.URLField(blank=True, null=True, verbose_name=_('OCSP responder URL'), help_text=_("URL of a OCSP responser for the CA.")) issuer_alt_name = models.URLField(blank=True, null=True, verbose_name=_('issuerAltName'), help_text=_("URL for your CA.")) _key = None def key(self, password): if self._key is None: with open(self.private_key_path, 'rb') as f: key_data = f.read() self._key = load_pem_private_key(key_data, password, default_backend()) return self._key @property def pathlen(self): """The ``pathlen`` attribute of the ``BasicConstraints`` extension (either an ``int`` or ``None``).""" try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS) except x509.ExtensionNotFound: # pragma: no cover - extension should always be present return None return ext.value.path_length @property def max_pathlen(self): """The maximum pathlen for any intermediate CAs signed by this CA. This value is either ``None``, if this and all parent CAs don't have a ``pathlen`` attribute, or an ``int`` if any parent CA has the attribute. """ pathlen = self.pathlen if self.parent is None: return pathlen max_parent = self.parent.max_pathlen if max_parent is None: return pathlen elif pathlen is None: return max_parent - 1 else: return min(self.pathlen, max_parent - 1) @property def allows_intermediate_ca(self): """Wether this CA allows creating intermediate CAs.""" max_pathlen = self.max_pathlen return max_pathlen is None or max_pathlen > 0 def nameConstraints(self): try: ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.NAME_CONSTRAINTS) except x509.ExtensionNotFound: return None value = [] if ext.value.permitted_subtrees: for general_name in ext.value.permitted_subtrees: value.append('Permitted: %s' % format_general_name(general_name)) if ext.value.excluded_subtrees: for general_name in ext.value.excluded_subtrees: value.append('Excluded: %s' % format_general_name(general_name)) return ext.critical, value @property def bundle(self): """A list of any parent CAs, including this CA. The list is ordered so the Root CA will be the first. """ ca = self bundle = [ca] while ca.parent is not None: bundle.append(ca.parent) ca = ca.parent return list(reversed(bundle)) class Meta: verbose_name = _('Certificate Authority') verbose_name_plural = _('Certificate Authorities') def __str__(self): return self.name
[docs]class Certificate(X509CertMixin): objects = CertificateManager.from_queryset(CertificateQuerySet)() watchers = models.ManyToManyField(Watcher, related_name='certificates', blank=True) ca = models.ForeignKey(CertificateAuthority, on_delete=models.CASCADE, verbose_name=_('Certificate Authority')) csr = models.TextField(verbose_name=_('CSR'), blank=True) @property def bundle(self): """The complete certificate bundle. This includes all CAs as well as the certificates itself.""" return self.ca.bundle + [self] def resign(self, **kwargs): # pragma: no cover - not used yet kwargs.setdefault('algorithm', ca_settings.CA_DIGEST_ALGORITHM) kwargs.setdefault('subject', self.subject) kwargs.setdefault('cn_in_san', False) # this should already be the case kwargs.setdefault('subjectAltName', self.subjectAltName()[1]) now = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) expires = now + timedelta(days=ca_settings.CA_DEFAULT_EXPIRES) kwargs.setdefault('expires', expires) try: ext_key_usage = self.x509.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE) kwargs.setdefault('extended_key_usage', (ext_key_usage.critical, ext_key_usage.value)) except x509.ExtensionNotFound: pass try: key_usage = self.x509.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE) kwargs.setdefault('key_usage', (key_usage.critical, key_usage.value)) except x509.ExtensionNotFound: pass try: tls_feature = self.x509.extensions.get_extension_for_oid(ExtensionOID.TLS_FEATURE) kwargs.setdefault('tls_feature', (tls_feature.critical, tls_feature.value)) except x509.ExtensionNotFound: pass return Certificate.objects.init(self.ca, self.csr, **kwargs) def __str__(self): return self.cn