PNG  IHDR;IDATxܻn0K )(pA 7LeG{ §㻢|ذaÆ 6lذaÆ 6lذaÆ 6lom$^yذag5bÆ 6lذaÆ 6lذa{ 6lذaÆ `}HFkm,mӪôô! x|'ܢ˟;E:9&ᶒ}{v]n&6 h_tڠ͵-ҫZ;Z$.Pkž)!o>}leQfJTu іچ\X=8Rن4`Vwl>nG^is"ms$ui?wbs[m6K4O.4%/bC%t Mז -lG6mrz2s%9s@-k9=)kB5\+͂Zsٲ Rn~GRC wIcIn7jJhۛNCS|j08yiHKֶۛkɈ+;SzL/F*\Ԕ#"5m2[S=gnaPeғL lذaÆ 6l^ḵaÆ 6lذaÆ 6lذa; _ذaÆ 6lذaÆ 6lذaÆ RIENDB` # Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file import zlib from jwcrypto import common from jwcrypto.common import base64url_decode, base64url_encode from jwcrypto.common import json_decode, json_encode from jwcrypto.jwa import JWA # RFC 7516 - 4.1 # name: (description, supported?) JWEHeaderRegistry = {'alg': ('Algorithm', True), 'enc': ('Encryption Algorithm', True), 'zip': ('Compression Algorithm', True), 'jku': ('JWK Set URL', False), 'jwk': ('JSON Web Key', False), 'kid': ('Key ID', True), 'x5u': ('X.509 URL', False), 'x5c': ('X.509 Certificate Chain', False), 'x5t': ('X.509 Certificate SHA-1 Thumbprint', False), 'x5t#S256': ('X.509 Certificate SHA-256 Thumbprint', False), 'typ': ('Type', True), 'cty': ('Content Type', True), 'crit': ('Critical', True)} """Registry of valid header parameters""" default_allowed_algs = [ # Key Management Algorithms 'RSA1_5', 'RSA-OAEP', 'RSA-OAEP-256', 'A128KW', 'A192KW', 'A256KW', 'dir', 'ECDH-ES', 'ECDH-ES+A128KW', 'ECDH-ES+A192KW', 'ECDH-ES+A256KW', 'A128GCMKW', 'A192GCMKW', 'A256GCMKW', 'PBES2-HS256+A128KW', 'PBES2-HS384+A192KW', 'PBES2-HS512+A256KW', # Content Encryption Algoritms 'A128CBC-HS256', 'A192CBC-HS384', 'A256CBC-HS512', 'A128GCM', 'A192GCM', 'A256GCM'] """Default allowed algorithms""" class InvalidJWEData(Exception): """Invalid JWE Object. This exception is raised when the JWE Object is invalid and/or improperly formatted. """ def __init__(self, message=None, exception=None): msg = None if message: msg = message else: msg = 'Unknown Data Verification Failure' if exception: msg += ' {%s}' % str(exception) super(InvalidJWEData, self).__init__(msg) # These have been moved to jwcrypto.common, maintain here for bacwards compat InvalidCEKeyLength = common.InvalidCEKeyLength InvalidJWEKeyLength = common.InvalidJWEKeyLength InvalidJWEKeyType = common.InvalidJWEKeyType InvalidJWEOperation = common.InvalidJWEOperation class JWE(object): """JSON Web Encryption object This object represent a JWE token. """ def __init__(self, plaintext=None, protected=None, unprotected=None, aad=None, algs=None, recipient=None, header=None): """Creates a JWE token. :param plaintext(bytes): An arbitrary plaintext to be encrypted. :param protected: A JSON string with the protected header. :param unprotected: A JSON string with the shared unprotected header. :param aad(bytes): Arbitrary additional authenticated data :param algs: An optional list of allowed algorithms :param recipient: An optional, default recipient key :param header: An optional header for the default recipient """ self._allowed_algs = None self.objects = dict() self.plaintext = None if plaintext is not None: if isinstance(plaintext, bytes): self.plaintext = plaintext else: self.plaintext = plaintext.encode('utf-8') self.cek = None self.decryptlog = None if aad: self.objects['aad'] = aad if protected: if isinstance(protected, dict): protected = json_encode(protected) else: json_decode(protected) # check header encoding self.objects['protected'] = protected if unprotected: if isinstance(unprotected, dict): unprotected = json_encode(unprotected) else: json_decode(unprotected) # check header encoding self.objects['unprotected'] = unprotected if algs: self.allowed_algs = algs if recipient: self.add_recipient(recipient, header=header) elif header: raise ValueError('Header is allowed only with default recipient') def _jwa_keymgmt(self, name): allowed = self._allowed_algs or default_allowed_algs if name not in allowed: raise InvalidJWEOperation('Algorithm not allowed') return JWA.keymgmt_alg(name) def _jwa_enc(self, name): allowed = self._allowed_algs or default_allowed_algs if name not in allowed: raise InvalidJWEOperation('Algorithm not allowed') return JWA.encryption_alg(name) @property def allowed_algs(self): """Allowed algorithms. The list of allowed algorithms. Can be changed by setting a list of algorithm names. """ if self._allowed_algs: return self._allowed_algs else: return default_allowed_algs @allowed_algs.setter def allowed_algs(self, algs): if not isinstance(algs, list): raise TypeError('Allowed Algs must be a list') self._allowed_algs = algs def _merge_headers(self, h1, h2): for k in list(h1.keys()): if k in h2: raise InvalidJWEData('Duplicate header: "%s"' % k) h1.update(h2) return h1 def _get_jose_header(self, header=None): jh = dict() if 'protected' in self.objects: ph = json_decode(self.objects['protected']) jh = self._merge_headers(jh, ph) if 'unprotected' in self.objects: uh = json_decode(self.objects['unprotected']) jh = self._merge_headers(jh, uh) if header: rh = json_decode(header) jh = self._merge_headers(jh, rh) return jh def _get_alg_enc_from_headers(self, jh): algname = jh.get('alg', None) if algname is None: raise InvalidJWEData('Missing "alg" from headers') alg = self._jwa_keymgmt(algname) encname = jh.get('enc', None) if encname is None: raise InvalidJWEData('Missing "enc" from headers') enc = self._jwa_enc(encname) return alg, enc def _encrypt(self, alg, enc, jh): aad = base64url_encode(self.objects.get('protected', '')) if 'aad' in self.objects: aad += '.' + base64url_encode(self.objects['aad']) aad = aad.encode('utf-8') compress = jh.get('zip', None) if compress == 'DEF': data = zlib.compress(self.plaintext)[2:-4] elif compress is None: data = self.plaintext else: raise ValueError('Unknown compression') iv, ciphertext, tag = enc.encrypt(self.cek, aad, data) self.objects['iv'] = iv self.objects['ciphertext'] = ciphertext self.objects['tag'] = tag def add_recipient(self, key, header=None): """Encrypt the plaintext with the given key. :param key: A JWK key or password of appropriate type for the 'alg' provided in the JOSE Headers. :param header: A JSON string representing the per-recipient header. :raises ValueError: if the plaintext is missing or not of type bytes. :raises ValueError: if the compression type is unknown. :raises InvalidJWAAlgorithm: if the 'alg' provided in the JOSE headers is missing or unknown, or otherwise not implemented. """ if self.plaintext is None: raise ValueError('Missing plaintext') if not isinstance(self.plaintext, bytes): raise ValueError("Plaintext must be 'bytes'") if isinstance(header, dict): header = json_encode(header) jh = self._get_jose_header(header) alg, enc = self._get_alg_enc_from_headers(jh) rec = dict() if header: rec['header'] = header wrapped = alg.wrap(key, enc.wrap_key_size, self.cek, jh) self.cek = wrapped['cek'] if 'ek' in wrapped: rec['encrypted_key'] = wrapped['ek'] if 'header' in wrapped: h = json_decode(rec.get('header', '{}')) nh = self._merge_headers(h, wrapped['header']) rec['header'] = json_encode(nh) if 'ciphertext' not in self.objects: self._encrypt(alg, enc, jh) if 'recipients' in self.objects: self.objects['recipients'].append(rec) elif 'encrypted_key' in self.objects or 'header' in self.objects: self.objects['recipients'] = list() n = dict() if 'encrypted_key' in self.objects: n['encrypted_key'] = self.objects.pop('encrypted_key') if 'header' in self.objects: n['header'] = self.objects.pop('header') self.objects['recipients'].append(n) self.objects['recipients'].append(rec) else: self.objects.update(rec) def serialize(self, compact=False): """Serializes the object into a JWE token. :param compact(boolean): if True generates the compact representation, otherwise generates a standard JSON format. :raises InvalidJWEOperation: if the object cannot serialized with the compact representation and `compact` is True. :raises InvalidJWEOperation: if no recipients have been added to the object. """ if 'ciphertext' not in self.objects: raise InvalidJWEOperation("No available ciphertext") if compact: for invalid in 'aad', 'unprotected': if invalid in self.objects: raise InvalidJWEOperation("Can't use compact encoding") if 'recipients' in self.objects: if len(self.objects['recipients']) != 1: raise InvalidJWEOperation("Invalid number of recipients") rec = self.objects['recipients'][0] else: rec = self.objects if 'header' in rec: # The AESGCMKW algorithm generates data (iv, tag) we put in the # per-recipient unpotected header by default. Move it to the # protected header and re-encrypt the payload, as the protected # header is used as additional authenticated data. h = json_decode(rec['header']) ph = json_decode(self.objects['protected']) nph = self._merge_headers(h, ph) self.objects['protected'] = json_encode(nph) jh = self._get_jose_header() alg, enc = self._get_alg_enc_from_headers(jh) self._encrypt(alg, enc, jh) del rec['header'] return '.'.join([base64url_encode(self.objects['protected']), base64url_encode(rec.get('encrypted_key', '')), base64url_encode(self.objects['iv']), base64url_encode(self.objects['ciphertext']), base64url_encode(self.objects['tag'])]) else: obj = self.objects enc = {'ciphertext': base64url_encode(obj['ciphertext']), 'iv': base64url_encode(obj['iv']), 'tag': base64url_encode(self.objects['tag'])} if 'protected' in obj: enc['protected'] = base64url_encode(obj['protected']) if 'unprotected' in obj: enc['unprotected'] = json_decode(obj['unprotected']) if 'aad' in obj: enc['aad'] = base64url_encode(obj['aad']) if 'recipients' in obj: enc['recipients'] = list() for rec in obj['recipients']: e = dict() if 'encrypted_key' in rec: e['encrypted_key'] = \ base64url_encode(rec['encrypted_key']) if 'header' in rec: e['header'] = json_decode(rec['header']) enc['recipients'].append(e) else: if 'encrypted_key' in obj: enc['encrypted_key'] = \ base64url_encode(obj['encrypted_key']) if 'header' in obj: enc['header'] = json_decode(obj['header']) return json_encode(enc) def _check_crit(self, crit): for k in crit: if k not in JWEHeaderRegistry: raise InvalidJWEData('Unknown critical header: "%s"' % k) else: if not JWEHeaderRegistry[k][1]: raise InvalidJWEData('Unsupported critical header: ' '"%s"' % k) # FIXME: allow to specify which algorithms to accept as valid def _decrypt(self, key, ppe): jh = self._get_jose_header(ppe.get('header', None)) # TODO: allow caller to specify list of headers it understands self._check_crit(jh.get('crit', dict())) alg = self._jwa_keymgmt(jh.get('alg', None)) enc = self._jwa_enc(jh.get('enc', None)) aad = base64url_encode(self.objects.get('protected', '')) if 'aad' in self.objects: aad += '.' + base64url_encode(self.objects['aad']) cek = alg.unwrap(key, enc.wrap_key_size, ppe.get('encrypted_key', b''), jh) data = enc.decrypt(cek, aad.encode('utf-8'), self.objects['iv'], self.objects['ciphertext'], self.objects['tag']) self.decryptlog.append('Success') self.cek = cek compress = jh.get('zip', None) if compress == 'DEF': self.plaintext = zlib.decompress(data, -zlib.MAX_WBITS) elif compress is None: self.plaintext = data else: raise ValueError('Unknown compression') def decrypt(self, key): """Decrypt a JWE token. :param key: The (:class:`jwcrypto.jwk.JWK`) decryption key. :param key: A (:class:`jwcrypto.jwk.JWK`) decryption key or a password string (optional). :raises InvalidJWEOperation: if the key is not a JWK object. :raises InvalidJWEData: if the ciphertext can't be decrypted or the object is otherwise malformed. """ if 'ciphertext' not in self.objects: raise InvalidJWEOperation("No available ciphertext") self.decryptlog = list() if 'recipients' in self.objects: for rec in self.objects['recipients']: try: self._decrypt(key, rec) except Exception as e: # pylint: disable=broad-except self.decryptlog.append('Failed: [%s]' % repr(e)) else: try: self._decrypt(key, self.objects) except Exception as e: # pylint: disable=broad-except self.decryptlog.append('Failed: [%s]' % repr(e)) if not self.plaintext: raise InvalidJWEData('No recipient matched the provided ' 'key' + repr(self.decryptlog)) def deserialize(self, raw_jwe, key=None): """Deserialize a JWE token. NOTE: Destroys any current status and tries to import the raw JWE provided. :param raw_jwe: a 'raw' JWE token (JSON Encoded or Compact notation) string. :param key: A (:class:`jwcrypto.jwk.JWK`) decryption key or a password string (optional). If a key is provided a decryption step will be attempted after the object is successfully deserialized. :raises InvalidJWEData: if the raw object is an invaid JWE token. :raises InvalidJWEOperation: if the decryption fails. """ self.objects = dict() self.plaintext = None self.cek = None o = dict() try: try: djwe = json_decode(raw_jwe) o['iv'] = base64url_decode(djwe['iv']) o['ciphertext'] = base64url_decode(djwe['ciphertext']) o['tag'] = base64url_decode(djwe['tag']) if 'protected' in djwe: p = base64url_decode(djwe['protected']) o['protected'] = p.decode('utf-8') if 'unprotected' in djwe: o['unprotected'] = json_encode(djwe['unprotected']) if 'aad' in djwe: o['aad'] = base64url_decode(djwe['aad']) if 'recipients' in djwe: o['recipients'] = list() for rec in djwe['recipients']: e = dict() if 'encrypted_key' in rec: e['encrypted_key'] = \ base64url_decode(rec['encrypted_key']) if 'header' in rec: e['header'] = json_encode(rec['header']) o['recipients'].append(e) else: if 'encrypted_key' in djwe: o['encrypted_key'] = \ base64url_decode(djwe['encrypted_key']) if 'header' in djwe: o['header'] = json_encode(djwe['header']) except ValueError: c = raw_jwe.split('.') if len(c) != 5: raise InvalidJWEData() p = base64url_decode(c[0]) o['protected'] = p.decode('utf-8') ekey = base64url_decode(c[1]) if ekey != b'': o['encrypted_key'] = base64url_decode(c[1]) o['iv'] = base64url_decode(c[2]) o['ciphertext'] = base64url_decode(c[3]) o['tag'] = base64url_decode(c[4]) self.objects = o except Exception as e: # pylint: disable=broad-except raise InvalidJWEData('Invalid format', repr(e)) if key: self.decrypt(key) @property def payload(self): if not self.plaintext: raise InvalidJWEOperation("Plaintext not available") return self.plaintext @property def jose_header(self): jh = self._get_jose_header() if len(jh) == 0: raise InvalidJWEOperation("JOSE Header not available") return jh