Source code for django_ca.utils

# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca.  If not,
# see <http://www.gnu.org/licenses/>.

"""Central functions to load CA key and cert as PKey/X509 objects."""

import binascii
import os
import re
import shlex
from datetime import datetime
from datetime import timedelta
from ipaddress import ip_address
from ipaddress import ip_network

import idna

from asn1crypto.core import OctetString
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509.oid import NameOID

from django.core.files.storage import get_storage_class
from django.core.serializers.json import DjangoJSONEncoder
from django.core.validators import URLValidator
from django.utils.encoding import force_bytes
from django.utils.encoding import force_str
from django.utils.functional import Promise
from django.utils.translation import gettext_lazy as _

from . import ca_settings

# List of possible subject fields, in order
SUBJECT_FIELDS = ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress', ]

# Description strings for various X509 extensions, taken from "man x509v3_config".
EXTENDED_KEY_USAGE_DESC = _('Purposes for which the certificate public key can be used for.')
KEY_USAGE_DESC = _('Permitted key usages.')

#: Regular expression to match RDNs out of a full x509 name.
NAME_RE = re.compile(r'(?:/+|\A)\s*(?P<field>[^\s]*?)\s*'
                     r'=(?P<quote>[\'"])?\s*(?P<content>(?(quote).*?|[^/]*))\s*'
                     r'(?(quote)(?<!\\)(?P=quote))', re.I)

#: Regular expression to match general names.
GENERAL_NAME_RE = re.compile('^(email|URI|IP|DNS|RID|dirName|otherName):(.*)', flags=re.I)

#: Regular expression matching hexlified certificate serials
SERIAL_RE = re.compile('^([0-9A-F][0-9A-F]:?)+[0-9A-F][0-9A-F]?$')
_datetime_format = '%Y%m%d%H%M%SZ'

SAN_NAME_MAPPINGS = {
    x509.DNSName: 'DNS',
    x509.RFC822Name: 'email',
    x509.DirectoryName: 'dirname',
    x509.UniformResourceIdentifier: 'URI',
    x509.IPAddress: 'IP',
    x509.RegisteredID: 'RID',
    x509.OtherName: 'otherName',
}

#: Map OID objects to IDs used in subject strings
OID_NAME_MAPPINGS = {
    NameOID.COUNTRY_NAME: 'C',
    NameOID.STATE_OR_PROVINCE_NAME: 'ST',
    NameOID.LOCALITY_NAME: 'L',
    NameOID.ORGANIZATION_NAME: 'O',
    NameOID.ORGANIZATIONAL_UNIT_NAME: 'OU',
    NameOID.COMMON_NAME: 'CN',
    NameOID.EMAIL_ADDRESS: 'emailAddress',
    NameOID.SERIAL_NUMBER: 'serialNumber',
    NameOID.JURISDICTION_COUNTRY_NAME: 'jurisdictionCountryName',
    NameOID.JURISDICTION_STATE_OR_PROVINCE_NAME: 'jurisdictionStateOrProvinceName',
    NameOID.BUSINESS_CATEGORY: "businessCategory",
    NameOID.POSTAL_CODE: "postalCode",
    NameOID.STREET_ADDRESS: "streetAddress",
}

# same, but reversed
NAME_OID_MAPPINGS = {v: k for k, v in OID_NAME_MAPPINGS.items()}

# Some OIDs can occur multiple times
MULTIPLE_OIDS = (
    NameOID.ORGANIZATIONAL_UNIT_NAME,
    NameOID.STREET_ADDRESS,
)

# uppercase values as keys for normalizing case
NAME_CASE_MAPPINGS = {v.upper(): v for v in OID_NAME_MAPPINGS.values()}


[docs]class LazyEncoder(DjangoJSONEncoder): """Encoder that also encodes strings translated with gettext_lazy.""" def default(self, obj): if isinstance(obj, Promise): return force_str(obj) return super(LazyEncoder, self).default(obj)
[docs]def sort_name(subject): """Returns the subject in the correct order for a x509 subject.""" return sorted(subject, key=lambda e: SUBJECT_FIELDS.index(e[0]))
[docs]def format_name(subject): """Convert a subject into the canonical form for distinguished names. This function does not take care of sorting the subject in any meaningful order. Examples:: >>> format_name([('CN', 'example.com'), ]) '/CN=example.com' >>> format_name([('CN', 'example.com'), ('O', "My Organization"), ]) '/CN=example.com/O=My Organization' """ if isinstance(subject, x509.Name): subject = [(OID_NAME_MAPPINGS[s.oid], s.value) for s in subject] return '/%s' % ('/'.join(['%s=%s' % (force_str(k), force_str(v)) for k, v in subject]))
[docs]def format_relative_name(name): """Convert a relative name (RDN) into a canonical form. Examples:: >>> format_relative_name([('C', 'AT'), ('CN', 'example.com')]) '/C=AT/CN=example.com' >>> format_relative_name(x509.RelativeDistinguishedName([ ... x509.NameAttribute(NameOID.COMMON_NAME, 'example.com') ... ])) '/CN=example.com' """ if isinstance(name, x509.RelativeDistinguishedName): name = [(OID_NAME_MAPPINGS[s.oid], s.value) for s in name] return '/%s' % ('/'.join(['%s=%s' % (force_str(k), force_str(v)) for k, v in name]))
[docs]def format_general_name(name): """Format a single general name. >>> import ipaddress >>> format_general_name(x509.DNSName('example.com')) 'DNS:example.com' >>> format_general_name(x509.IPAddress(ipaddress.IPv4Address('127.0.0.1'))) 'IP:127.0.0.1' """ if isinstance(name, x509.DirectoryName): value = format_name(name.value) else: value = name.value return '%s:%s' % (SAN_NAME_MAPPINGS[type(name)], value)
[docs]def is_power2(num): """Return True if num is a power of 2. >>> is_power2(4) True >>> is_power2(3) False """ return num != 0 and ((num & (num - 1)) == 0)
[docs]def multiline_url_validator(value): """Validate that a TextField contains one valid URL per line. .. seealso:: https://docs.djangoproject.com/en/1.9/ref/validators/ """ validator = URLValidator() for line in value.splitlines(): validator(line)
[docs]def add_colons(s, pad='0'): """Add colons after every second digit. This function is used in functions to prettify serials. >>> add_colons('teststring') 'te:st:st:ri:ng' Parameters ---------- s : str The string to add colons to pad : str, default If not None, pad the string so that the last element always has two characters. The default is ``"0"``. """ if len(s) % 2 == 1 and pad is not None: s = '%s%s' % (pad, s) return ':'.join([s[i:i + 2] for i in range(0, len(s), 2)])
[docs]def int_to_hex(i): """Create a hex-representation of the given serial. >>> int_to_hex(12345678) 'BC614E' """ return hex(i)[2:].upper()
[docs]def bytes_to_hex(v): """Convert a bytes array to hex. >>> bytes_to_hex(b'test') '74:65:73:74' """ return add_colons(binascii.hexlify(v).upper().decode('utf-8'))
[docs]def hex_to_bytes(v): """Convert a hex number to bytes. This should be the inverse of :py:func:`~django_ca.utils.bytes_to_hex`. >>> hex_to_bytes('74:65:73:74') b'test' """ return binascii.unhexlify(v.replace(':', ''))
[docs]def parse_name(name): """Parses a subject string as used in OpenSSLs command line utilities. The ``name`` is expected to be close to the subject format commonly used by OpenSSL, for example ``/C=AT/L=Vienna/CN=example.com/emailAddress=user@example.com``. The function does its best to be lenient on deviations from the format, object identifiers are case-insensitive (e.g. ``cn`` is the same as ``CN``, whitespace at the start and end is stripped and the subject does not have to start with a slash (``/``). >>> parse_name('/CN=example.com') [('CN', 'example.com')] >>> parse_name('c=AT/l= Vienna/o="ex org"/CN=example.com') [('C', 'AT'), ('L', 'Vienna'), ('O', 'ex org'), ('CN', 'example.com')] Dictionary keys are normalized to the values of :py:const:`OID_NAME_MAPPINGS` and keys will be sorted based on x509 name specifications regardless of the given order: >>> parse_name('L="Vienna / District"/EMAILaddress=user@example.com') [('L', 'Vienna / District'), ('emailAddress', 'user@example.com')] >>> parse_name('/C=AT/CN=example.com') == parse_name('/CN=example.com/C=AT') True Due to the magic of :py:const:`NAME_RE`, the function even supports quoting strings and including slashes, so strings like ``/OU="Org / Org Unit"/CN=example.com`` will work as expected. >>> parse_name('L="Vienna / District"/CN=example.com') [('L', 'Vienna / District'), ('CN', 'example.com')] But note that it's still easy to trick this function, if you really want to. The following example is *not* a valid subject, the location is just bogus, and whatever you were expecting as output, it's certainly different: >>> parse_name('L="Vienna " District"/CN=example.com') [('L', 'Vienna'), ('CN', 'example.com')] Examples of where this string is used are: .. code-block:: console # openssl req -new -key priv.key -out csr -utf8 -batch -sha256 -subj '/C=AT/CN=example.com' # openssl x509 -in cert.pem -noout -subject -nameopt compat /C=AT/L=Vienna/CN=example.com """ name = name.strip() if not name: # empty subjects are ok return [] try: items = [(NAME_CASE_MAPPINGS[t[0].upper()], force_str(t[2])) for t in NAME_RE.findall(name)] except KeyError as e: raise ValueError('Unknown x509 name field: %s' % e.args[0]) # Check that no OIDs not in MULTIPLE_OIDS occur more then once for key, oid in NAME_OID_MAPPINGS.items(): if sum(1 for t in items if t[0] == key) > 1 and oid not in MULTIPLE_OIDS: raise ValueError('Subject contains multiple "%s" fields' % key) return sort_name(items)
[docs]def x509_name(name): """Parses a subject into a :py:class:`x509.Name <cg:cryptography.x509.Name>`. If ``name`` is a string, :py:func:`parse_name` is used to parse it. >>> x509_name('/C=AT/CN=example.com') <Name(C=AT,CN=example.com)> >>> x509_name([('C', 'AT'), ('CN', 'example.com')]) <Name(C=AT,CN=example.com)> """ if isinstance(name, str): name = parse_name(name) return x509.Name([x509.NameAttribute(NAME_OID_MAPPINGS[typ], force_str(value)) for typ, value in name])
[docs]def x509_relative_name(name): """Parse a relative name (RDN) into a :py:class:`~cg:cryptography.x509.RelativeDistinguishedName`. >>> x509_relative_name('/CN=example.com') <RelativeDistinguishedName(CN=example.com)> >>> x509_relative_name([('CN', 'example.com')]) <RelativeDistinguishedName(CN=example.com)> """ if isinstance(name, x509.RelativeDistinguishedName): return name elif isinstance(name, str): name = parse_name(name) return x509.RelativeDistinguishedName([ x509.NameAttribute(NAME_OID_MAPPINGS[typ], force_str(value)) for typ, value in name ])
[docs]def validate_email(addr): """Validate an email address. This function raises ``ValueError`` if the email address is not valid. >>> validate_email('foo@bar.com') 'foo@bar.com' >>> validate_email('foo@bar com') Traceback (most recent call last): ... ValueError: Invalid domain: bar com """ if '@' not in addr: raise ValueError('Invalid email address: %s' % addr) node, domain = addr.split('@', 1) try: domain = idna.encode(force_str(domain)) except idna.core.IDNAError: raise ValueError('Invalid domain: %s' % domain) return '%s@%s' % (node, force_str(domain))
[docs]def validate_hostname(hostname, allow_port=False): """Validate a hostname, optionally with a given port. >>> validate_hostname('example.com') 'example.com' >>> validate_hostname('example.com:8000', allow_port=True) 'example.com:8000' Parameters ---------- hostname : str The hostname to validate. allow_port : bool, optional If ``True``, the hostname can also contain an optional port number, e.g. "example.com:8000". Raises ------ ValueError If hostname or port are not valid. """ port = None if allow_port is True and ':' in hostname: hostname, port = hostname.rsplit(':', 1) try: port = int(port) except ValueError: raise ValueError('%s: Port must be an integer' % port) else: if port < 1 or port > 65535: raise ValueError('%s: Port must be between 1 and 65535' % port) try: encoded = idna.encode(hostname).decode('utf-8') except idna.IDNAError: raise ValueError('%s: Not a valid hostname' % hostname) if allow_port is True and port is not None: return '%s:%s' % (encoded, port) return encoded
[docs]def validate_key_parameters(key_size=None, key_type='RSA', ecc_curve=None): """Validate parameters for private key generation and return sanitized values. This function can be used to fail early if invalid parameters are passed, before the private key is generated. >>> validate_key_parameters() # defaults (1024, 'RSA', None) >>> validate_key_parameters(4096, 'ECC', None) # doctest: +ELLIPSIS (None, 'ECC', <cryptography.hazmat.primitives.asymmetric.ec.SECP256R1 object at ...>) >>> validate_key_parameters(4000, 'RSA', None) Traceback (most recent call last): ... ValueError: 4000: Key size must be a power of two """ if key_type is None: key_type = 'RSA' if key_type == 'ECC': key_size = None ecc_curve = parse_key_curve(ecc_curve) elif key_type in ['DSA', 'RSA']: if key_size is None: key_size = ca_settings.CA_DEFAULT_KEY_SIZE if not is_power2(key_size): raise ValueError("%s: Key size must be a power of two" % key_size) elif key_size < ca_settings.CA_MIN_KEY_SIZE: raise ValueError("%s: Key size must be least %s bits" % ( key_size, ca_settings.CA_MIN_KEY_SIZE)) else: raise ValueError('%s: Unknown key type' % key_type) return key_size, key_type, ecc_curve
[docs]def generate_private_key(key_size, key_type, ecc_curve): """Generate a private key. This function assumes that you called :py:func:`~django_ca.utils.validate_key_parameters` on the input values and does not do any sanity checks on its own. Parameters ---------- key_size : int The size of the private key (not used for ECC keys). key_type : {'RSA', 'DSA', 'ECC'} The type of the private key. ecc_curve : :py:class:`~cg:cryptography.hazmat.primitives.asymmetric.ec.EllipticCurve` The ECC curve to use for an ECC key. Returns ------- key A private key of the appropriate type. """ if key_type == 'DSA': private_key = dsa.generate_private_key(key_size=key_size, backend=default_backend()) elif key_type == 'ECC': private_key = ec.generate_private_key(ecc_curve, default_backend()) else: private_key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend()) return private_key
[docs]def parse_general_name(name): """Parse a general name from user input. This function will do its best to detect the intended type of any value passed to it: >>> parse_general_name('example.com') <DNSName(value='example.com')> >>> parse_general_name('*.example.com') <DNSName(value='*.example.com')> >>> parse_general_name('.example.com') # Syntax used e.g. for NameConstraints: All levels of subdomains <DNSName(value='.example.com')> >>> parse_general_name('user@example.com') <RFC822Name(value='user@example.com')> >>> parse_general_name('https://example.com') <UniformResourceIdentifier(value='https://example.com')> >>> parse_general_name('1.2.3.4') <IPAddress(value=1.2.3.4)> >>> parse_general_name('fd00::1') <IPAddress(value=fd00::1)> >>> parse_general_name('/CN=example.com') <DirectoryName(value=<Name(CN=example.com)>)> The default fallback is to assume a :py:class:`~cg:cryptography.x509.DNSName`. If this doesn't work, an exception will be raised: >>> parse_general_name('foo..bar`*123') # doctest: +ELLIPSIS Traceback (most recent call last): ... idna.core.IDNAError: ... If you want to override detection, you can prefix the name to match :py:const:`GENERAL_NAME_RE`: >>> parse_general_name('email:user@example.com') <RFC822Name(value='user@example.com')> >>> parse_general_name('URI:https://example.com') <UniformResourceIdentifier(value='https://example.com')> >>> parse_general_name('dirname:/CN=example.com') <DirectoryName(value=<Name(CN=example.com)>)> Some more exotic values can only be generated by using this prefix: >>> parse_general_name('rid:2.5.4.3') <RegisteredID(value=<ObjectIdentifier(oid=2.5.4.3, name=commonName)>)> >>> parse_general_name('otherName:2.5.4.3;UTF8:example.com') <OtherName(type_id=<ObjectIdentifier(oid=2.5.4.3, name=commonName)>, value=b'example.com')> If you give a prefixed value, this function is less forgiving of any typos and does not catch any exceptions: >>> parse_general_name('email:foo@bar com') Traceback (most recent call last): ... ValueError: Invalid domain: bar com """ if isinstance(name, x509.GeneralName): return name elif not isinstance(name, str): raise ValueError('Cannot parse general name %s: Must be of type str (was: %s).' % (name, type(name).__name__)) typ = None match = GENERAL_NAME_RE.match(name) if match is not None: typ, name = match.groups() typ = typ.lower() if typ is None: if re.match('[a-z0-9]{2,}://', name): # Looks like a URI try: return x509.UniformResourceIdentifier(name) except Exception: # pragma: no cover - this really accepts anything pass if '@' in name: # Looks like an Email address try: return x509.RFC822Name(validate_email(name)) except Exception: pass if name.strip().startswith('/'): # maybe it's a dirname? return x509.DirectoryName(x509_name(name)) # Try to parse this as IPAddress/Network try: return x509.IPAddress(ip_address(name)) except ValueError: pass try: return x509.IPAddress(ip_network(name)) except ValueError: pass # Try to encode as domain name. DNSName() does not validate the domain name, but this check will fail. if name.startswith('*.'): idna.encode(name[2:]) elif name.startswith('.'): idna.encode(name[1:]) else: idna.encode(name) # Almost anything passes as DNS name, so this is our default fallback return x509.DNSName(name) if typ == 'uri': return x509.UniformResourceIdentifier(name) elif typ == 'email': return x509.RFC822Name(validate_email(name)) elif typ == 'ip': try: return x509.IPAddress(ip_address(name)) except ValueError: pass try: return x509.IPAddress(ip_network(name)) except ValueError: pass raise ValueError('Could not parse IP address.') elif typ == 'rid': return x509.RegisteredID(x509.ObjectIdentifier(name)) elif typ == 'othername': regex = "(.*);(.*):(.*)" if re.match(regex, name) is not None: oid, asn_typ, val = re.match(regex, name).groups() oid = x509.ObjectIdentifier(oid) if asn_typ == 'UTF8': val = val.encode('utf-8') elif asn_typ == 'OctetString': val = bytes(bytearray.fromhex(val)) val = OctetString(val).dump() else: raise ValueError('Unsupported ASN type in otherName: %s' % asn_typ) val = force_bytes(val) return x509.OtherName(oid, val) else: raise ValueError('Incorrect otherName format: %s' % name) elif typ == 'dirname': return x509.DirectoryName(x509_name(name)) else: # Try to encode the domain name. DNSName() does not validate the domain name, but this # check will fail. if name.startswith('*.'): idna.encode(name[2:]) elif name.startswith('.'): idna.encode(name[1:]) else: idna.encode(name) return x509.DNSName(name)
[docs]def parse_hash_algorithm(value=None): """Parse a hash algorithm value. The most common use case is to pass a str naming a class in :py:mod:`~cg:cryptography.hazmat.primitives.hashes`. For convenience, passing ``None`` will return the value of :ref:`CA_DIGEST_ALGORITHM <settings-ca-digest-algorithm>`, and passing an :py:class:`~cg:cryptography.hazmat.primitives.hashes.HashAlgorithm` will return that instance unchanged. Example usage:: >>> parse_hash_algorithm() # doctest: +ELLIPSIS <cryptography.hazmat.primitives.hashes.SHA512 object at ...> >>> parse_hash_algorithm('SHA512') # doctest: +ELLIPSIS <cryptography.hazmat.primitives.hashes.SHA512 object at ...> >>> parse_hash_algorithm(' SHA512 ') # doctest: +ELLIPSIS <cryptography.hazmat.primitives.hashes.SHA512 object at ...> >>> parse_hash_algorithm(hashes.SHA512) # doctest: +ELLIPSIS <cryptography.hazmat.primitives.hashes.SHA512 object at ...> >>> parse_hash_algorithm(hashes.SHA512()) # doctest: +ELLIPSIS <cryptography.hazmat.primitives.hashes.SHA512 object at ...> >>> parse_hash_algorithm('Wrong') # doctest: +ELLIPSIS Traceback (most recent call last): ... ValueError: Unknown hash algorithm: Wrong >>> parse_hash_algorithm(object()) # doctest: +ELLIPSIS Traceback (most recent call last): ... ValueError: Unknown type passed: object Parameters ---------- value : str or :py:class:`~cg:cryptography.hazmat.primitives.hashes.HashAlgorithm`, optional The value to parse, the function description on how possible values are used. Returns ------- algorithm A :py:class:`~cg:cryptography.hazmat.primitives.hashes.HashAlgorithm` instance. Raises ------ ValueError If an unknown object is passed or if ``value`` does not name a known algorithm. """ if value is None: return ca_settings.CA_DIGEST_ALGORITHM elif isinstance(value, type) and issubclass(value, hashes.HashAlgorithm): return value() elif isinstance(value, hashes.HashAlgorithm): return value elif isinstance(value, str): try: return getattr(hashes, value.strip())() except AttributeError: raise ValueError('Unknown hash algorithm: %s' % value) else: raise ValueError('Unknown type passed: %s' % type(value).__name__)
[docs]def parse_encoding(value=None): """Parse a value to a valid encoding. This function accepts either a member of :py:class:`~cg:cryptography.hazmat.primitives.serialization.Encoding` or a string describing a member. If no value is passed, it will assume ``PEM`` as a default value. Note that ``"ASN1"`` is treated as an alias for ``"DER"``. >>> parse_encoding() <Encoding.PEM: 'PEM'> >>> parse_encoding('DER') <Encoding.DER: 'DER'> >>> parse_encoding(Encoding.PEM) <Encoding.PEM: 'PEM'> """ if value is None: return ca_settings.CA_DEFAULT_ENCODING elif isinstance(value, Encoding): return value elif isinstance(value, str): if value == 'ASN1': value = 'DER' try: return getattr(Encoding, value) except AttributeError: raise ValueError('Unknown encoding: %s' % value) else: raise ValueError('Unknown type passed: %s' % type(value).__name__)
[docs]def parse_key_curve(value=None): """Parse an elliptic curve value. This function uses a value identifying an elliptic curve to return an :py:class:`~cg:cryptography.hazmat.primitives.asymmetric.ec.EllipticCurve` instance. The name must match a class name of one of the classes named under "Elliptic Curves" in :any:`cg:hazmat/primitives/asymmetric/ec`. For convenience, passing ``None`` will return the value of :ref:`CA_DEFAULT_ECC_CURVE <settings-ca-default-ecc-curve>`, and passing an :py:class:`~cg:cryptography.hazmat.primitives.asymmetric.ec.EllipticCurve` will return that instance unchanged. Example usage:: >>> parse_key_curve('SECP256R1') # doctest: +ELLIPSIS <cryptography.hazmat.primitives.asymmetric.ec.SECP256R1 object at ...> >>> parse_key_curve('SECP384R1') # doctest: +ELLIPSIS <cryptography.hazmat.primitives.asymmetric.ec.SECP384R1 object at ...> >>> parse_key_curve(ec.SECP256R1()) # doctest: +ELLIPSIS <cryptography.hazmat.primitives.asymmetric.ec.SECP256R1 object at ...> >>> parse_key_curve() # doctest: +ELLIPSIS <cryptography.hazmat.primitives.asymmetric.ec.SECP256R1 object at ...> Parameters ---------- value : str, otional The name of the curve or ``None`` to return the default curve. Returns ------- curve An :py:class:`~cg:cryptography.hazmat.primitives.asymmetric.ec.EllipticCurve` instance. Raises ------ ValueError If the named curve is not supported. """ if isinstance(value, ec.EllipticCurve): return value # name was already parsed if value is None: return ca_settings.CA_DEFAULT_ECC_CURVE curve = getattr(ec, value.strip(), type) if not issubclass(curve, ec.EllipticCurve): raise ValueError('%s: Not a known Eliptic Curve' % value) return curve()
[docs]def get_cert_builder(expires, serial=None): """Get a basic X509 cert builder object. .. TODO:: deprecate support for passing datetime as expires Parameters ---------- expires : datetime or timedelta When this certificate will expire. serial : int, optional Serial number to set for this certificate. Use :py:func:`~cg:cryptography.x509.random_serial_number` to generate such a value. By default, a value will be generated. """ now = datetime.utcnow().replace(second=0, microsecond=0) if serial is None: serial = x509.random_serial_number() if expires is None: expires = now + ca_settings.CA_DEFAULT_EXPIRES elif isinstance(expires, timedelta): expires = now + expires else: expires = expires.replace(second=0, microsecond=0) builder = x509.CertificateBuilder() builder = builder.not_valid_before(now) builder = builder.not_valid_after(expires) builder = builder.serial_number(serial) return builder
[docs]def read_file(path): """Read the file from the given path. If ``path`` is an absolute path, reads a file from the local filesystem. For relative paths, read the file using the storage backend configured using :ref:`CA_FILE_STORAGE <settings-ca-file-storage>`. """ if os.path.isabs(path): with open(path, 'rb') as stream: return stream.read() stream = ca_storage.open(path) try: # NOTE: In the python:3.9-rc-alpine3.10 Docker image, this is marked as a missed branch :-( return stream.read() # pragma: no branch finally: stream.close()
# Note used currently, but left here for future reference #def write_private_file(path, data): # """Function to write binary data to a file that will only be readable to the user.""" # # with os.fdopen(os.open(path, os.O_CREAT | os.O_WRONLY, 0o400), 'wb') as fh: # fh.write(data)
[docs]def shlex_split(s, sep): """Split a character on the given set of characters. Example:: >>> shlex_split('foo,bar', ', ') ['foo', 'bar'] >>> shlex_split('foo\\\\,bar1', ',') # escape a separator ['foo,bar1'] >>> shlex_split('"foo,bar", bla', ', ') ['foo,bar', 'bla'] >>> shlex_split('foo,"bar,bla"', ',') ['foo', 'bar,bla'] """ lex = shlex.shlex(s, posix=True) lex.whitespace = sep lex.whitespace_split = True return [line for line in lex]
[docs]class GeneralNameList(list): """List that holds :py:class:`~cg:cryptography.x509.GeneralName` instances and parses ``str`` when added. A ``GeneralNameList`` is a ``list`` subclass that will always only hold :py:class:`~cg:cryptography.x509.GeneralName` instances, but any ``str`` passed to it will be passed to :py:func:`~django_ca.utils.parse_general_name`:: >>> from cryptography import x509 >>> l = GeneralNameList(['example.com']) >>> l += ['DNS:example.net', x509.DNSName('example.org')] >>> print(l) <GeneralNameList: ['DNS:example.com', 'DNS:example.net', 'DNS:example.org']> >>> 'example.com' in l, 'DNS:example.com' in l, x509.DNSName('example.com') in l (True, True, True) >>> l == ['example.com', 'example.net', 'example.org'] True >>> l == [x509.DNSName('example.com'), 'example.net', 'DNS:example.org'] True """ def __init__(self, iterable=tuple()): if isinstance(iterable, (str, x509.GeneralName)): iterable = [iterable] super().__init__(parse_general_name(v) for v in iterable) def serialize(self): for v in self: yield format_general_name(v) def __add__(self, value): # self + other_list if isinstance(value, GeneralNameList) is False: value = GeneralNameList(value) return GeneralNameList(list(self) + list(value)) def __contains__(self, value): # value in self try: value = parse_general_name(value) except ValueError: return False return list.__contains__(self, value) def __eq__(self, other): # value == other if isinstance(other, GeneralNameList) is False and isinstance(other, list) is True: other = GeneralNameList(other) return list.__eq__(self, other) def __iadd__(self, value): # self += value return list.__iadd__(self, (parse_general_name(v) for v in value)) def __repr__(self): return '<GeneralNameList: %r>' % [format_general_name(v) for v in self] def __setitem__(self, key, value): # l[0] = 'example.com' if isinstance(key, slice): # l[0:1] = ['example.com'] list.__setitem__(self, key, (parse_general_name(v) for v in value)) else: list.__setitem__(self, key, parse_general_name(value)) def append(self, object): list.append(self, parse_general_name(object)) def count(self, value): try: value = parse_general_name(value) except ValueError: return 0 return list.count(self, value) def extend(self, iterable): list.extend(self, (parse_general_name(i) for i in iterable)) def index(self, value, *args): return list.index(self, parse_general_name(value), *args) def insert(self, index, object): list.insert(self, index, parse_general_name(object)) def remove(self, value): list.remove(self, parse_general_name(value))
def get_crl_cache_key(serial, algorithm=hashes.SHA512, encoding=Encoding.DER, scope=None): return 'crl_%s_%s_%s_%s' % (serial, algorithm.name, encoding.name, scope) ca_storage = get_storage_class(ca_settings.CA_FILE_STORAGE)(**ca_settings.CA_FILE_STORAGE_KWARGS)