# -*- coding: utf-8 -*-
#
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.
import base64
import logging
import os
from datetime import datetime
from datetime import timedelta
import asn1crypto
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import Encoding
from ocspbuilder import OCSPResponseBuilder
from oscrypto.asymmetric import load_certificate
from oscrypto.asymmetric import load_private_key
from django.core.cache import cache
from django.core.exceptions import PermissionDenied
from django.http import HttpResponse
from django.http import HttpResponseServerError
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.utils.encoding import force_bytes
from django.utils.encoding import force_text
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.base import View
from django.views.generic.detail import SingleObjectMixin
from django.views.generic.edit import UpdateView
from .crl import get_crl
from .forms import RevokeCertificateForm
from .models import Certificate
from .models import CertificateAuthority
from .utils import int_to_hex
log = logging.getLogger(__name__)
[docs]class CertificateRevocationListView(View, SingleObjectMixin):
"""Generic view that provides Certificate Revocation Lists (CRLs)."""
slug_field = 'serial'
slug_url_kwarg = 'serial'
queryset = CertificateAuthority.objects.all().prefetch_related('certificate_set')
password = None
"""Password used to load the private key of the certificate authority. If not set, the private key is
assumed to be unencrypted."""
# parameters for the CRL itself
type = Encoding.DER
"""Filetype for CRL."""
ca_crl = False
"""If set to ``True``, return a CRL for child CAs instead."""
expires = 600
"""CRL expires in this many seconds."""
digest = hashes.SHA512()
"""Digest used for generating the CRL."""
# header used in the request
content_type = None
"""Value of the Content-Type header used in the response. For CRLs in PEM format, use ``text/plain``."""
def get(self, request, serial):
cache_key = 'crl_%s_%s_%s' % (serial, self.type, self.digest.name)
if self.ca_crl is True:
cache_key += '_ca'
crl = cache.get(cache_key)
if crl is None:
ca = self.get_object()
crl = get_crl(ca, encoding=self.type, expires=self.expires, algorithm=self.digest,
password=self.password, ca_crl=self.ca_crl)
cache.set(cache_key, crl, self.expires)
content_type = self.content_type
if content_type is None:
if self.type == Encoding.DER:
content_type = 'application/pkix-crl'
elif self.type == Encoding.PEM:
content_type = 'text/plain'
else: # pragma: no cover
# DER/PEM are all known encoding types, so this shouldn't happen
return HttpResponseServerError()
return HttpResponse(crl, content_type=content_type)
class RevokeCertificateView(UpdateView):
admin_site = None
queryset = Certificate.objects.filter(revoked=False)
form_class = RevokeCertificateForm
template_name = 'django_ca/admin/certificate_revoke_form.html'
def dispatch(self, request, *args, **kwargs):
if not self.request.user.has_perm('django_ca.change_certificate'):
raise PermissionDenied
return super(RevokeCertificateView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
context = super(RevokeCertificateView, self).get_context_data(**kwargs)
context.update(self.admin_site.each_context(self.request))
context['opts'] = self.queryset.model._meta # required by breadcrumbs
return context
def form_valid(self, form):
reason = form.cleaned_data['revoked_reason'] or None
form.instance.revoke(reason=reason)
return super(RevokeCertificateView, self).form_valid(form)
def get_success_url(self):
meta = self.queryset.model._meta
return reverse('admin:%s_%s_change' % (meta.app_label, meta.verbose_name),
args=(self.object.pk, ))
[docs]class OCSPView(View):
"""View to provide an OCSP responder.
.. seealso::
This is heavily inspired by
https://github.com/threema-ch/ocspresponder/blob/master/ocspresponder/__init__.py.
"""
ca = None
"""The name or serial of your Certificate Authority."""
responder_key = None
"""Absolute path to the private key used for signing OCSP responses."""
responder_cert = None
"""Absolute path to the public key used for signing OCSP responses. May also be a serial identifying a
certificate from the database."""
expires = 600
"""Time in seconds that the responses remain valid. The default is 600 seconds or ten minutes."""
ca_ocsp = False
"""If set to ``True``, validate child CAs instead."""
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super(OCSPView, self).dispatch(*args, **kwargs)
def get(self, request, data):
return self.process_ocsp_request(base64.b64decode(data))
def post(self, request):
return self.process_ocsp_request(request.body)
def fail(self, reason):
builder = OCSPResponseBuilder(response_status=reason)
return builder.build()
def process_ocsp_request(self, data):
status = 200
try:
response = self.get_ocsp_response(data)
except Exception as e: # pragma: no cover
# all exceptions in the function should be covered.
log.exception(e)
response = self.fail(u'internal_error')
status = 500
return HttpResponse(response.dump(), status=status,
content_type='application/ocsp-response')
def get_responder_key(self):
with open(self.responder_key, 'rb') as stream:
responder_key = stream.read()
# try to load responder key and cert with oscrypto, to make sure they are actually usable
return load_private_key(responder_key)
def get_responder_cert(self):
if os.path.exists(self.responder_cert):
with open(self.responder_cert, 'rb') as stream:
responder_cert = stream.read()
else:
responder_cert = Certificate.objects.get(serial=self.responder_cert)
return load_certificate(responder_cert)
def get_ocsp_response(self, data):
try:
ocsp_request = asn1crypto.ocsp.OCSPRequest.load(data)
tbs_request = ocsp_request['tbs_request']
request_list = tbs_request['request_list']
if len(request_list) != 1:
log.error('Received OCSP request with multiple sub requests')
raise NotImplemented('Combined requests not yet supported')
single_request = request_list[0] # TODO: Support more than one request
req_cert = single_request['req_cert']
serial = int_to_hex(req_cert['serial_number'].native)
except Exception as e:
log.exception('Error parsing OCSP request: %s', e)
return self.fail(u'malformed_request')
# Get CA and certificate
try:
ca = CertificateAuthority.objects.get_by_serial_or_cn(self.ca)
except CertificateAuthority.DoesNotExist:
log.error('%s: Certificate Authority could not be found.')
return self.fail(u'internal_error')
if self.ca_ocsp is True:
try:
cert = CertificateAuthority.objects.filter(parent=ca).get(serial=serial)
except CertificateAuthority.DoesNotExist:
log.warn('OCSP request for unknown CA received.')
return self.fail(u'internal_error')
else:
try:
cert = Certificate.objects.filter(ca=ca).get(serial=serial)
except Certificate.DoesNotExist:
log.warn('OCSP request for unknown cert received.')
return self.fail(u'internal_error')
# load ca cert and responder key/cert
try:
ca_cert = load_certificate(force_bytes(ca.pub))
except Exception:
log.error('Could not load CA certificate.')
return self.fail(u'internal_error')
try:
responder_key = self.get_responder_key()
responder_cert = self.get_responder_cert()
except Exception:
log.error('Could not read responder key/cert.')
return self.fail(u'internal_error')
builder = OCSPResponseBuilder(
response_status=u'successful', # ResponseStatus.successful.value,
certificate=load_certificate(force_bytes(cert.pub)),
certificate_status=force_text(cert.ocsp_status),
revocation_date=cert.revoked_date,
)
# Parse extensions
for extension in tbs_request['request_extensions']:
extn_id = extension['extn_id'].native
critical = extension['critical'].native
value = extension['extn_value'].parsed
# This variable tracks whether any unknown extensions were encountered
unknown = False
# Handle nonce extension
if extn_id == 'nonce':
builder.nonce = value.native
# That's all we know
else: # pragma: no cover
unknown = True
# If an unknown critical extension is encountered (which should not
# usually happen, according to RFC 6960 4.1.2), we should throw our
# hands up in despair and run.
if unknown is True and critical is True: # pragma: no cover
log.warning('Could not parse unknown critical extension: %r',
dict(extension.native))
return self._fail('internal_error')
# If it's an unknown non-critical extension, we can safely ignore it.
elif unknown is True: # pragma: no cover
log.info('Ignored unknown non-critical extension: %r', dict(extension.native))
builder.certificate_issuer = ca_cert
builder.next_update = datetime.utcnow() + timedelta(seconds=self.expires)
return builder.build(responder_key, responder_cert)