# -*- coding: utf-8 -*-
#
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not,
# see <http://www.gnu.org/licenses/>.
import base64
import binascii
import hashlib
import re
from collections import OrderedDict
from datetime import datetime
from datetime import timedelta
import pytz
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PublicFormat
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.x509 import TLSFeatureType
from cryptography.x509.oid import AuthorityInformationAccessOID
from cryptography.x509.oid import ExtensionOID
from django.conf import settings
from django.db import models
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.encoding import force_str
from django.utils.translation import ugettext_lazy as _
from . import ca_settings
from .managers import CertificateAuthorityManager
from .managers import CertificateManager
from .querysets import CertificateAuthorityQuerySet
from .querysets import CertificateQuerySet
from .signals import post_revoke_cert
from .signals import pre_revoke_cert
from .utils import EXTENDED_KEY_USAGE_REVERSED
from .utils import KEY_USAGE_MAPPING
from .utils import OID_NAME_MAPPINGS
from .utils import add_colons
from .utils import format_general_name
from .utils import format_general_names
from .utils import format_name
from .utils import int_to_hex
from .utils import multiline_url_validator
class Watcher(models.Model):
name = models.CharField(max_length=64, null=True, blank=True, verbose_name=_('CommonName'))
mail = models.EmailField(verbose_name=_('E-Mail'), unique=True)
@classmethod
def from_addr(cls, addr):
name = None
match = re.match(r'(.*?)\s*<(.*)>', addr)
if match is not None:
name, addr = match.groups()
try:
w = cls.objects.get(mail=addr)
if w.name != name:
w.name = name
w.save()
except cls.DoesNotExist:
w = cls(mail=addr, name=name)
w.full_clean()
w.save()
return w
def __str__(self):
if self.name:
return '%s <%s>' % (self.name, self.mail)
return self.mail
class X509CertMixin(models.Model):
# reasons are defined in http://www.ietf.org/rfc/rfc3280.txt
REVOCATION_REASONS = (
('', _('No reason')),
('aa_compromise', _('Attribute Authority compromised')),
('affiliation_changed', _('Affiliation changed')),
('ca_compromise', _('CA compromised')),
('certificate_hold', _('On Hold')),
('cessation_of_operation', _('Cessation of operation')),
('key_compromise', _('Key compromised')),
('privilege_withdrawn', _('Privilege withdrawn')),
('remove_from_crl', _('Removed from CRL')),
('superseded', _('Superseded')),
('unspecified', _('Unspecified')),
)
created = models.DateTimeField(auto_now=True)
expires = models.DateTimeField(null=False, blank=False)
pub = models.TextField(verbose_name=_('Public key'))
cn = models.CharField(max_length=128, verbose_name=_('CommonName'))
serial = models.CharField(max_length=64, unique=True)
# revocation information
revoked = models.BooleanField(default=False)
revoked_date = models.DateTimeField(null=True, blank=True, verbose_name=_('Revoked on'))
revoked_reason = models.CharField(
max_length=32, null=True, blank=True, verbose_name=_('Reason for revokation'),
choices=REVOCATION_REASONS)
_x509 = None
@property
def x509(self):
if self._x509 is None:
backend = default_backend()
self._x509 = x509.load_pem_x509_certificate(force_bytes(self.pub), backend)
return self._x509
@x509.setter
def x509(self, value):
self._x509 = value
self.pub = force_str(self.dump_certificate(Encoding.PEM))
self.cn = self.subject['CN']
self.expires = self.not_after
if settings.USE_TZ:
self.expires = timezone.make_aware(self.expires, timezone=pytz.utc)
self.serial = int_to_hex(value.serial_number)
@property
def subject(self):
return OrderedDict([(OID_NAME_MAPPINGS[s.oid], s.value) for s in self.x509.subject])
@property
def issuer(self):
return OrderedDict([(OID_NAME_MAPPINGS[s.oid], s.value) for s in self.x509.issuer])
@property
def not_before(self):
return self.x509.not_valid_before
@property
def not_after(self):
return self.x509.not_valid_after
def extensions(self):
for ext in sorted(self.x509.extensions, key=lambda e: e.oid._name):
name = ext.oid._name
if hasattr(self, name):
yield name, getattr(self, name)()
elif name == 'cRLDistributionPoints':
yield name, self.crlDistributionPoints()
else: # pragma: no cover - we have a function for everything we support
yield name, str(ext.value)
def distinguishedName(self):
return format_name(self.x509.subject)
distinguishedName.short_description = 'Distinguished Name'
def subjectAltName(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
except x509.ExtensionNotFound:
return None
return ext.critical, [format_general_name(name) for name in ext.value]
def crlDistributionPoints(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.CRL_DISTRIBUTION_POINTS)
except x509.ExtensionNotFound:
return None
value = []
for dp in ext.value:
if dp.full_name:
value.append('Full Name: %s' % format_general_names(dp.full_name))
else: # pragma: no cover - not really used in the wild
value.append('Relative Name: %s' % format_name(dp.relative_name.value))
return ext.critical, value
def authorityInfoAccess(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
except x509.ExtensionNotFound: # pragma: no cover - extension should always be present
return None
output = []
for desc in ext.value:
if desc.access_method == AuthorityInformationAccessOID.OCSP:
output.append('OCSP - %s' % format_general_name(desc.access_location))
elif desc.access_method == AuthorityInformationAccessOID.CA_ISSUERS:
output.append('CA Issuers - %s' % format_general_name(desc.access_location))
else: # pragma: no cover - nothing else is known currently.
output.append('Unknown')
return ext.critical, output
def basicConstraints(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS)
except x509.ExtensionNotFound: # pragma: no cover - extension should always be present
return None
if ext.value.ca is True:
value = 'CA:TRUE'
else:
value = 'CA:FALSE'
if ext.value.path_length is not None:
value = '%s, pathlen:%s' % (value, ext.value.path_length)
return ext.critical, value
def keyUsage(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
except x509.ExtensionNotFound:
return None
usages = []
for key, value in KEY_USAGE_MAPPING.items():
try:
if getattr(ext.value, value):
usages.append(key)
except ValueError:
pass
return ext.critical, list(sorted(usages))
def extendedKeyUsage(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE)
except x509.ExtensionNotFound:
return None
return ext.critical, [EXTENDED_KEY_USAGE_REVERSED[u] for u in ext.value]
def subjectKeyIdentifier(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_KEY_IDENTIFIER)
except x509.ExtensionNotFound: # pragma: no cover - extension should always be present
return None
hexlified = binascii.hexlify(ext.value.digest).upper().decode('utf-8')
return ext.critical, add_colons(hexlified)
def issuerAltName(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.ISSUER_ALTERNATIVE_NAME)
except x509.ExtensionNotFound:
return None
return ext.critical, format_general_names(ext.value)
def authorityKeyIdentifier(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.AUTHORITY_KEY_IDENTIFIER)
except x509.ExtensionNotFound: # pragma: no cover - extension should always be present
return None
hexlified = binascii.hexlify(ext.value.key_identifier).upper().decode('utf-8')
return ext.critical, 'keyid:%s' % add_colons(hexlified)
def TLSFeature(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.TLS_FEATURE)
except x509.ExtensionNotFound:
return None
features = []
for feature in ext.value:
if feature == TLSFeatureType.status_request:
features.append('OCSP Must-Staple')
elif feature == TLSFeatureType.status_request_v2:
features.append('Multiple Certificate Status Request')
else: # pragma: no cover - all features of cryptography 2.1 are covered
features.append('Unknown TLS Feature')
return ext.critical, features
def get_digest(self, algo):
algo = getattr(hashes, algo.upper())()
return add_colons(binascii.hexlify(self.x509.fingerprint(algo)).upper().decode('utf-8'))
@property
def hpkp_pin(self):
# taken from https://github.com/luisgf/hpkp-python/blob/master/hpkp.py
public_key_raw = self.x509.public_key().public_bytes(
encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo)
public_key_hash = hashlib.sha256(public_key_raw).digest()
return base64.b64encode(public_key_hash).decode('utf-8')
def dump_certificate(self, encoding=Encoding.PEM):
return self.x509.public_bytes(encoding=encoding)
def revoke(self, reason=None):
pre_revoke_cert.send(sender=self.__class__, cert=self, reason=reason)
self.revoked = True
self.revoked_date = timezone.now()
self.revoked_reason = reason
self.save()
post_revoke_cert.send(sender=self.__class__, cert=self)
def get_revocation(self):
"""Get a crypto.Revoked object or None if the cert is not revoked."""
if self.revoked is False:
raise ValueError('Certificate is not revoked.')
revoked_cert = x509.RevokedCertificateBuilder().serial_number(
self.x509.serial_number).revocation_date(self.revoked_date)
if self.revoked_reason:
reason_flag = getattr(x509.ReasonFlags, self.revoked_reason)
revoked_cert = revoked_cert.add_extension(x509.CRLReason(reason_flag), critical=False)
return revoked_cert.build(default_backend())
@property
def ocsp_status(self):
# NOTE: The OCSP status 'good' does not say if the certificate has expired.
if self.revoked is False:
return 'good'
return self.revoked_reason or 'revoked'
class Meta:
abstract = True
[docs]class CertificateAuthority(X509CertMixin):
objects = CertificateAuthorityManager.from_queryset(CertificateAuthorityQuerySet)()
name = models.CharField(max_length=32, help_text=_('A human-readable name'), unique=True)
enabled = models.BooleanField(default=True)
# WARNING: on_delete MUST be a keyword argument in Django 1.8.
parent = models.ForeignKey('self', on_delete=models.SET_NULL, null=True, blank=True,
related_name='children')
private_key_path = models.CharField(max_length=256, help_text=_('Path to the private key.'))
# various details used when signing certs
crl_url = models.TextField(blank=True, null=True, validators=[multiline_url_validator],
verbose_name=_('CRL URLs'),
help_text=_("URLs, one per line, where you can retrieve the CRL."))
issuer_url = models.URLField(blank=True, null=True, verbose_name=_('Issuer URL'),
help_text=_("URL to the certificate of this CA (in DER format)."))
ocsp_url = models.URLField(blank=True, null=True, verbose_name=_('OCSP responder URL'),
help_text=_("URL of a OCSP responser for the CA."))
issuer_alt_name = models.URLField(blank=True, null=True, verbose_name=_('issuerAltName'),
help_text=_("URL for your CA."))
_key = None
def key(self, password):
if self._key is None:
with open(self.private_key_path, 'rb') as f:
key_data = f.read()
self._key = load_pem_private_key(key_data, password, default_backend())
return self._key
@property
def pathlen(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS)
except x509.ExtensionNotFound: # pragma: no cover - extension should always be present
return None
return ext.value.path_length
@property
def max_pathlen(self):
pathlen = self.pathlen
if self.parent is None:
return pathlen
max_parent = self.parent.max_pathlen
if max_parent is None:
return pathlen
elif pathlen is None:
return max_parent - 1
else:
return min(self.pathlen, max_parent - 1)
@property
def allows_intermediate_ca(self):
"""Wether this CA allows creating intermediate CAs."""
max_pathlen = self.max_pathlen
return max_pathlen is None or max_pathlen > 0
def nameConstraints(self):
try:
ext = self.x509.extensions.get_extension_for_oid(ExtensionOID.NAME_CONSTRAINTS)
except x509.ExtensionNotFound:
return None
value = []
if ext.value.permitted_subtrees:
for general_name in ext.value.permitted_subtrees:
value.append('Permitted: %s' % format_general_name(general_name))
if ext.value.excluded_subtrees:
for general_name in ext.value.excluded_subtrees:
value.append('Excluded: %s' % format_general_name(general_name))
return ext.critical, value
class Meta:
verbose_name = _('Certificate Authority')
verbose_name_plural = _('Certificate Authorities')
def __str__(self):
return self.name
[docs]class Certificate(X509CertMixin):
objects = CertificateManager.from_queryset(CertificateQuerySet)()
watchers = models.ManyToManyField(Watcher, related_name='certificates', blank=True)
# WARNING: on_delete MUST be a keyword argument in Django 1.8.
ca = models.ForeignKey(CertificateAuthority, on_delete=models.CASCADE,
verbose_name=_('Certificate Authority'))
csr = models.TextField(verbose_name=_('CSR'), blank=True)
def resign(self, **kwargs):
kwargs.setdefault('algorithm', ca_settings.CA_DIGEST_ALGORITHM)
kwargs.setdefault('subject', self.subject)
kwargs.setdefault('cn_in_san', False) # this should already be the case
kwargs.setdefault('subjectAltName', self.subjectAltName()[1])
now = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
expires = now + timedelta(days=ca_settings.CA_DEFAULT_EXPIRES)
kwargs.setdefault('expires', expires)
try:
ext_key_usage = self.x509.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE)
kwargs.setdefault('extendedKeyUsage', (ext_key_usage.critical, ext_key_usage.value))
except x509.ExtensionNotFound:
pass
try:
key_usage = self.x509.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
kwargs.setdefault('keyUsage', (key_usage.critical, key_usage.value))
except x509.ExtensionNotFound:
pass
try:
tls_features = self.x509.extensions.get_extension_for_oid(ExtensionOID.TLS_FEATURE)
kwargs.setdefault('tls_features', (tls_features.critical, tls_features.value))
except x509.ExtensionNotFound:
pass
return Certificate.objects.init(self.ca, self.csr, **kwargs)
def __str__(self):
return self.cn