|
| 1 | +"""Python Flask API Auth0 integration example |
| 2 | +""" |
| 3 | + |
| 4 | +from functools import wraps |
1 | 5 | import json |
2 | 6 | from os import environ as env, path |
3 | 7 | import urllib |
4 | 8 |
|
5 | 9 | from dotenv import load_dotenv |
6 | | -from functools import wraps |
7 | 10 | from flask import Flask, request, jsonify, _app_ctx_stack |
8 | 11 | from flask_cors import cross_origin |
9 | 12 | from jose import jwt |
10 | 13 |
|
11 | | -load_dotenv(path.join(path.dirname(__file__), '.env')) |
12 | | -auth0_domain = env['AUTH0_DOMAIN'] |
13 | | -api_audience = env['API_ID'] |
| 14 | +load_dotenv(path.join(path.dirname(__file__), ".env")) |
| 15 | +AUTH0_DOMAIN = env["AUTH0_DOMAIN"] |
| 16 | +API_AUDIENCE = env["API_ID"] |
14 | 17 |
|
15 | | -app = Flask(__name__) |
| 18 | +APP = Flask(__name__) |
16 | 19 |
|
17 | 20 |
|
18 | 21 | # Format error response and append status code. |
19 | 22 | def handle_error(error, status_code): |
| 23 | + """Handles the errors |
| 24 | + """ |
20 | 25 | resp = jsonify(error) |
21 | 26 | resp.status_code = status_code |
22 | 27 | return resp |
23 | 28 |
|
| 29 | +def get_token_auth_header(): |
| 30 | + """Obtains the access token from the Authorization Header |
| 31 | + """ |
| 32 | + auth = request.headers.get("Authorization", None) |
| 33 | + if not auth: |
| 34 | + return handle_error({"code": "authorization_header_missing", |
| 35 | + "description": |
| 36 | + "Authorization header is expected"}, 401) |
| 37 | + |
| 38 | + parts = auth.split() |
| 39 | + |
| 40 | + if parts[0].lower() != "bearer": |
| 41 | + return handle_error({"code": "invalid_header", |
| 42 | + "description": |
| 43 | + "Authorization header must start with" |
| 44 | + "Bearer"}, 401) |
| 45 | + elif len(parts) == 1: |
| 46 | + return handle_error({"code": "invalid_header", |
| 47 | + "description": "Token not found"}, 401) |
| 48 | + elif len(parts) > 2: |
| 49 | + return handle_error({"code": "invalid_header", |
| 50 | + "description": "Authorization header must be" |
| 51 | + "Bearer token"}, 401) |
| 52 | + |
| 53 | + token = parts[1] |
| 54 | + return token |
| 55 | + |
| 56 | +def requires_scope(required_scope): |
| 57 | + """Determines if the required scope is present in the access token |
| 58 | + Args: |
| 59 | + required_scope (str): The scope required to access the resource |
| 60 | + """ |
| 61 | + token = get_token_auth_header() |
| 62 | + unverified_claims = jwt.get_unverified_claims(token) |
| 63 | + token_scopes = unverified_claims["scope"].split() |
| 64 | + for token_scope in token_scopes: |
| 65 | + if token_scope == required_scope: |
| 66 | + return True |
| 67 | + return False |
24 | 68 |
|
25 | 69 | def requires_auth(f): |
| 70 | + """Determines if the access token is valid |
| 71 | + """ |
26 | 72 | @wraps(f) |
27 | 73 | def decorated(*args, **kwargs): |
28 | | - auth = request.headers.get('Authorization', None) |
29 | | - if not auth: |
30 | | - return handle_error({'code': 'authorization_header_missing', |
31 | | - 'description': |
32 | | - 'Authorization header is expected'}, 401) |
33 | | - |
34 | | - parts = auth.split() |
35 | | - |
36 | | - if parts[0].lower() != 'bearer': |
37 | | - return handle_error({'code': 'invalid_header', |
38 | | - 'description': |
39 | | - 'Authorization header must start with' |
40 | | - 'Bearer'}, 401) |
41 | | - elif len(parts) == 1: |
42 | | - return handle_error({'code': 'invalid_header', |
43 | | - 'description': 'Token not found'}, 401) |
44 | | - elif len(parts) > 2: |
45 | | - return handle_error({'code': 'invalid_header', |
46 | | - 'description': 'Authorization header must be' |
47 | | - 'Bearer + \s + token'}, 401) |
48 | | - |
49 | | - token = parts[1] |
50 | | - jsonurl = urllib.urlopen('https://'+auth0_domain+'/.well-known/jwks.json') |
| 74 | + token = get_token_auth_header() |
| 75 | + jsonurl = urllib.urlopen("https://"+AUTH0_DOMAIN+"/.well-known/jwks.json") |
51 | 76 | jwks = json.loads(jsonurl.read()) |
52 | 77 | unverified_header = jwt.get_unverified_header(token) |
53 | 78 | rsa_key = {} |
54 | | - for key in jwks['keys']: |
55 | | - if key['kid'] == unverified_header['kid']: |
| 79 | + for key in jwks["keys"]: |
| 80 | + if key["kid"] == unverified_header["kid"]: |
56 | 81 | rsa_key = { |
57 | | - 'kty': key['kty'], |
58 | | - 'kid': key['kid'], |
59 | | - 'use': key['use'], |
60 | | - 'n': key['n'], |
61 | | - 'e': key['e'] |
| 82 | + "kty": key["kty"], |
| 83 | + "kid": key["kid"], |
| 84 | + "use": key["use"], |
| 85 | + "n": key["n"], |
| 86 | + "e": key["e"] |
62 | 87 | } |
63 | 88 | if rsa_key: |
64 | 89 | try: |
65 | 90 | payload = jwt.decode( |
66 | 91 | token, |
67 | 92 | rsa_key, |
68 | | - algorithms=unverified_header['alg'], |
69 | | - audience=api_audience, |
70 | | - issuer='https://'+auth0_domain+'/' |
| 93 | + algorithms=unverified_header["alg"], |
| 94 | + audience=API_AUDIENCE, |
| 95 | + issuer="https://"+AUTH0_DOMAIN+"/" |
71 | 96 | ) |
72 | 97 | except jwt.ExpiredSignatureError: |
73 | | - return handle_error({'code': 'token_expired', |
74 | | - 'description': 'token is expired'}, 401) |
| 98 | + return handle_error({"code": "token_expired", |
| 99 | + "description": "token is expired"}, 401) |
75 | 100 | except jwt.JWTClaimsError: |
76 | | - return handle_error({'code': 'invalid_claims', |
77 | | - 'description': 'incorrect claims, please check the audience and issuer'}, 401) |
| 101 | + return handle_error({"code": "invalid_claims", |
| 102 | + "description": "incorrect claims," |
| 103 | + "please check the audience and issuer"}, 401) |
78 | 104 | except Exception: |
79 | | - return handle_error({'code': 'invalid_header', |
80 | | - 'description': 'Unable to parse authentication' |
81 | | - ' token.'}, 400) |
| 105 | + return handle_error({"code": "invalid_header", |
| 106 | + "description": "Unable to parse authentication" |
| 107 | + "token."}, 400) |
82 | 108 |
|
83 | 109 | _app_ctx_stack.top.current_user = payload |
84 | 110 | return f(*args, **kwargs) |
85 | | - return handle_error({'code': 'invalid_header', |
86 | | - 'description': 'Unable to find appropriate key'}, 400) |
| 111 | + return handle_error({"code": "invalid_header", |
| 112 | + "description": "Unable to find appropriate key"}, 400) |
87 | 113 | return decorated |
88 | 114 |
|
89 | | - |
90 | 115 | # Controllers API |
91 | | -@app.route("/ping") |
92 | | -@cross_origin(headers=['Content-Type', 'Authorization']) |
| 116 | +@APP.route("/ping") |
| 117 | +@cross_origin(headers=["Content-Type", "Authorization"]) |
93 | 118 | def ping(): |
| 119 | + """No access token required to access this route |
| 120 | + """ |
94 | 121 | return "All good. You don't need to be authenticated to call this" |
95 | 122 |
|
96 | 123 |
|
97 | | -@app.route("/secured/ping") |
98 | | -@cross_origin(headers=['Content-Type', 'Authorization']) |
99 | | -@cross_origin(headers=['Access-Control-Allow-Origin', '*']) |
| 124 | +@APP.route("/secured/ping") |
| 125 | +@cross_origin(headers=["Content-Type", "Authorization"]) |
| 126 | +@cross_origin(headers=["Access-Control-Allow-Origin", "*"]) |
100 | 127 | @requires_auth |
101 | | -def securedPing(): |
| 128 | +def secured_ping(): |
| 129 | + """A valid access token is required to access this route |
| 130 | + """ |
102 | 131 | return "All good. You only get this message if you're authenticated" |
103 | 132 |
|
| 133 | +@APP.route("/secured/private/ping") |
| 134 | +@cross_origin(headers=["Content-Type", "Authorization"]) |
| 135 | +@cross_origin(headers=["Access-Control-Allow-Origin", "*"]) |
| 136 | +@requires_auth |
| 137 | +def secured_private_ping(): |
| 138 | + """A valid access token and an appropriate scope are required to access this route |
| 139 | + """ |
| 140 | + if requires_scope("read:agenda"): |
| 141 | + return "All good. You're authenticated and the access token has the appropriate scope" |
| 142 | + return "You don't have access to this resource" |
| 143 | + |
104 | 144 |
|
105 | 145 | if __name__ == "__main__": |
106 | | - app.run(host='0.0.0.0', port=env.get('PORT', 3001)) |
| 146 | + APP.run(host="0.0.0.0", port=env.get("PORT", 3001)) |
0 commit comments