Source code for django_ca.views

# -*- coding: utf-8 -*-
#
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca.  If not,
# see <http://www.gnu.org/licenses/>.

import base64
import logging
import os
from datetime import datetime
from datetime import timedelta

import asn1crypto
from ocspbuilder import OCSPResponseBuilder
from OpenSSL import crypto
from oscrypto.asymmetric import load_certificate
from oscrypto.asymmetric import load_private_key

from django.core.cache import cache
from django.core.exceptions import ImproperlyConfigured
from django.core.urlresolvers import reverse
from django.http import HttpResponse
from django.utils import six
from django.utils.decorators import classonlymethod
from django.utils.decorators import method_decorator
from django.utils.encoding import force_bytes
from django.utils.encoding import force_text
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.base import View
from django.views.generic.detail import SingleObjectMixin
from django.views.generic.edit import UpdateView

from .crl import get_crl
from .forms import RevokeCertificateForm
from .models import Certificate
from .models import CertificateAuthority
from .utils import serial_from_int

log = logging.getLogger(__name__)


[docs]class CertificateRevocationListView(View, SingleObjectMixin): """Generic view that provides Certificate Revocation Lists (CRLs).""" slug_field = 'serial' slug_url_kwarg = 'serial' queryset = CertificateAuthority.objects.all().prefetch_related('certificate_set') # parameters for the CRL itself type = crypto.FILETYPE_ASN1 """Filetype for CRL, one of the ``OpenSSL.crypto.FILETYPE_*`` variables. The default is ``OpenSSL.crypto.FILETYPE_ASN1``.""" expires = 600 """CRL expires in this many seconds.""" digest = 'sha512' """Digest used for generating the CRL.""" # header used in the request content_type = 'application/pkix-crl' """The value of the Content-Type header used in the response. For CRLs in PEM format, use ``"text/plain"``.""" def get(self, request, serial): cache_key = 'crl_%s_%s_%s' % (serial, self.type, self.digest) crl = cache.get(cache_key) if crl is None: ca = self.get_object() crl = get_crl(ca, type=self.type, expires=self.expires, digest=force_bytes(self.digest)) cache.set(cache_key, crl, self.expires) return HttpResponse(crl, content_type=self.content_type)
class RevokeCertificateView(UpdateView): admin_site = None queryset = Certificate.objects.filter(revoked=False) form_class = RevokeCertificateForm template_name = 'django_ca/admin/certificate_revoke_form.html' def get_context_data(self, **kwargs): context = super(RevokeCertificateView, self).get_context_data(**kwargs) context.update(self.admin_site.each_context(self.request)) context['opts'] = self.queryset.model._meta # required by breadcrumbs return context def form_valid(self, form): reason = form.cleaned_data['revoked_reason'] or None form.instance.revoke(reason=reason) return super(RevokeCertificateView, self).form_valid(form) def get_success_url(self): meta = self.queryset.model._meta return reverse('admin:%s_%s_change' % (meta.app_label, meta.verbose_name), args=(self.object.pk, ))
[docs]class OCSPView(View): """View to provide an OCSP responder. .. seealso:: This is heavily inspired by https://github.com/threema-ch/ocspresponder/blob/master/ocspresponder/__init__.py. """ ca = None """The serial of your certificate authority.""" responder_key = None """Absolute path to the private key used for signing OCSP responses.""" responder_cert = None """Absolute path, serial of the public key or key itself used for signing OCSP responses.""" expires = 600 """Time in seconds that the responses remain valid. The default is 600 seconds or ten minutes.""" @classonlymethod def as_view(cls, responder_key, responder_cert, **kwargs): # Preload the responder key and certificate for faster access. try: with open(responder_key, 'rb') as stream: responder_key = stream.read() except: raise ImproperlyConfigured('%s: Could not read private key.' % responder_key) if os.path.exists(responder_cert): with open(responder_cert, 'rb') as stream: responder_cert = stream.read() elif isinstance(responder_cert, six.string_types) and len(responder_cert) == 47: try: cert = Certificate.objects.get(serial=responder_cert) responder_cert = force_bytes(cert.pub) except Certificate.DoesNotExist: pass if not responder_cert: raise ImproperlyConfigured('%s: Could not read public key.' % responder_cert) return super(OCSPView, cls).as_view( responder_key=responder_key, responder_cert=responder_cert, **kwargs) @method_decorator(csrf_exempt) def dispatch(self, *args, **kwargs): return super(OCSPView, self).dispatch(*args, **kwargs) def get(self, request, data): return self.process_ocsp_request(base64.b64decode(data)) def post(self, request): return self.process_ocsp_request(request.body) def fail(self, reason): builder = OCSPResponseBuilder(response_status=reason) return builder.build() def process_ocsp_request(self, data): status = 200 try: response = self.get_ocsp_response(data) except Exception as e: log.exception(e) response = self.fail(u'internal_error') status = 500 return HttpResponse(response.dump(), status=status, content_type='application/ocsp-response') def get_ocsp_response(self, data): try: ocsp_request = asn1crypto.ocsp.OCSPRequest.load(data) tbs_request = ocsp_request['tbs_request'] request_list = tbs_request['request_list'] if len(request_list) != 1: log.error('Received OCSP request with multiple sub requests') raise NotImplemented('Combined requests not yet supported') single_request = request_list[0] # TODO: Support more than one request req_cert = single_request['req_cert'] serial = serial_from_int(req_cert['serial_number'].native) except Exception as e: log.exception('Error parsing OCSP request: %s', e) return self.fail(u'malformed_request') # Get CA and certificate ca = CertificateAuthority.objects.get(serial=self.ca) try: cert = Certificate.objects.filter(ca=ca).get(serial=serial) except Certificate.DoesNotExist: log.warn('OCSP request for unknown cert received.') return self.fail(u'internal_error') # load ca cert and responder key/cert ca_cert = load_certificate(force_bytes(ca.pub)) responder_key = load_private_key(self.responder_key) responder_cert = load_certificate(self.responder_cert) builder = OCSPResponseBuilder( response_status=u'successful', # ResponseStatus.successful.value, certificate=load_certificate(force_bytes(cert.pub)), certificate_status=force_text(cert.ocsp_status), revocation_date=cert.revoked_date, ) # Parse extensions for extension in tbs_request['request_extensions']: extn_id = extension['extn_id'].native critical = extension['critical'].native value = extension['extn_value'].parsed # This variable tracks whether any unknown extensions were encountered unknown = False # Handle nonce extension if extn_id == 'nonce': builder.nonce = value.native # That's all we know else: # pragma: no cover unknown = True # If an unknown critical extension is encountered (which should not # usually happen, according to RFC 6960 4.1.2), we should throw our # hands up in despair and run. if unknown is True and critical is True: # pragma: no cover log.warning('Could not parse unknown critical extension: %r', dict(extension.native)) return self._fail('internal_error') # If it's an unknown non-critical extension, we can safely ignore it. elif unknown is True: # pragma: no cover log.info('Ignored unknown non-critical extension: %r', dict(extension.native)) builder.certificate_issuer = ca_cert builder.next_update = datetime.utcnow() + timedelta(seconds=self.expires) return builder.build(responder_key, responder_cert)