documentation, cleanup
This commit is contained in:
95
auth.py
95
auth.py
@@ -1,23 +1,38 @@
|
||||
"""
|
||||
Authentication module.
|
||||
Its main export is the `BBPermission` class, used in `permission_classes=[..., BBPermission]`
|
||||
Further information: [strawberry permssions](https://strawberry.rocks/docs/guides/permissions)
|
||||
"""
|
||||
|
||||
# stdlib imports
|
||||
from functools import cached_property
|
||||
import logging
|
||||
import typing
|
||||
|
||||
# dependencies imports
|
||||
import httpx
|
||||
|
||||
from jose import jws, jwt
|
||||
import strawberry
|
||||
|
||||
from strawberry.permission import BasePermission
|
||||
from strawberry.fastapi import BaseContext
|
||||
from strawberry.types import Info as _Info
|
||||
from strawberry.types.info import RootValueType
|
||||
|
||||
# app imports
|
||||
import consts
|
||||
import config
|
||||
|
||||
# valid signing algorithms. This mostly serves to avoid the 'none' exploit, add more algorithms as necessary
|
||||
|
||||
# valid signing algorithms.
|
||||
# This mostly serves to avoid the 'none' exploit, add more algorithms as necessary
|
||||
# see README.md for further discussion
|
||||
VALID_ALGS = ['HS256', 'RS256']
|
||||
|
||||
logger = logging.getLogger(consts.LOG_ROOT)
|
||||
PERMISSIONS_KEY = config.str_val('AUTH_PERMISSIONS_KEY')
|
||||
OLS_PREFIX = config.str_val('AUTH_OLS_PREFIX')
|
||||
|
||||
logger = logging.getLogger(consts.LOG_ROOT)
|
||||
logger.debug('loading JWKS data')
|
||||
|
||||
# fetch IdP configuration, extract JWKS url from there, and load its contents
|
||||
@@ -33,51 +48,79 @@ jwks = httpx.get(jwks_url).json()
|
||||
|
||||
|
||||
class Context(BaseContext):
|
||||
"""
|
||||
Context class that adds JWT data (in bearer token form)
|
||||
"""
|
||||
@cached_property
|
||||
def token(self) -> str | None:
|
||||
"""
|
||||
JWT bearer token property
|
||||
"""
|
||||
# no request means we cannot access the Authorization header -> token is None
|
||||
if not self.request:
|
||||
return None
|
||||
|
||||
# extract bearer token data from header, if available
|
||||
token = self.request.headers.get("Authorization", None)
|
||||
if token:
|
||||
token = token.split("Bearer ")[1]
|
||||
return token
|
||||
|
||||
|
||||
# define Info class that contains our additional Context data
|
||||
# this class is automatically populated when part of a corresponding method signature,
|
||||
# e.g. `has_permission` or `@strawberry.___`
|
||||
Info = _Info[Context, RootValueType]
|
||||
|
||||
|
||||
async def get_context() -> Context:
|
||||
"""
|
||||
helper method to asynchronously access context data
|
||||
"""
|
||||
return Context()
|
||||
|
||||
|
||||
class SecurityException(Exception):
|
||||
pass
|
||||
class BBPermission(BasePermission):
|
||||
"""
|
||||
Generic permission check that checks the operation/field name against a list of
|
||||
permissions supplied in the JWT's claims under `config.AUTH_PERMISSIONS_KEY`. The
|
||||
permissions are prefixed with `config.AUTH_OLS_PREFIX`.
|
||||
|
||||
See `README.md` for further information.
|
||||
"""
|
||||
message = "Permission denied"
|
||||
|
||||
def validate_permissions(token: str | None, method_name: str) -> bool:
|
||||
if token is None:
|
||||
logger.info("no token")
|
||||
return False
|
||||
def has_permission(self, source: typing.Any, info: Info, **kwargs) -> bool:
|
||||
operation = info.field_name
|
||||
token = info.context.token
|
||||
|
||||
headers = jwt.get_unverified_headers(token)
|
||||
logger.debug('headers: %s', headers)
|
||||
# no token present -> permission denied
|
||||
if token is None:
|
||||
logger.info("no token")
|
||||
return False
|
||||
|
||||
claims = jwt.get_unverified_claims(token)
|
||||
logger.debug('claims: %s', claims)
|
||||
headers = jwt.get_unverified_headers(token)
|
||||
logger.debug('headers: %s', headers)
|
||||
|
||||
permissions = claims.get("basebox/permissions", [])
|
||||
algorithm = headers.get('alg')
|
||||
if not algorithm in VALID_ALGS:
|
||||
logger.info('Invalid signing algorithm: %s', algorithm)
|
||||
return False
|
||||
claims = jwt.get_unverified_claims(token)
|
||||
logger.debug('claims: %s', claims)
|
||||
|
||||
token_valid = jws.verify(token=token, key=jwks, algorithms=algorithm)
|
||||
if not token_valid:
|
||||
logger.info("invalid token")
|
||||
return False
|
||||
permissions = claims.get("basebox/permissions", [])
|
||||
algorithm = headers.get('alg')
|
||||
# invalid algorithm -> permission denied
|
||||
if not algorithm in VALID_ALGS:
|
||||
logger.info('Invalid signing algorithm: %s', algorithm)
|
||||
return False
|
||||
|
||||
namespaced_method = f'{OLS_PREFIX}{method_name}'
|
||||
logger.debug("verify %s <> %s", namespaced_method, permissions)
|
||||
has_permission = namespaced_method in permissions
|
||||
return has_permission
|
||||
token_valid = jws.verify(token=token, key=jwks, algorithms=algorithm)
|
||||
# token verification failed -> permission denied
|
||||
if not token_valid:
|
||||
logger.info("invalid token")
|
||||
return False
|
||||
|
||||
# check whether the requested operation is in the list of valid operations
|
||||
namespaced_method = f'{OLS_PREFIX}{operation}'
|
||||
logger.debug("verify required permission: %s against claims: %s",
|
||||
namespaced_method, permissions)
|
||||
has_permission = namespaced_method in permissions
|
||||
return has_permission
|
||||
|
Reference in New Issue
Block a user