Files
periodic-table/backend/src/controller.py

191 lines
6.0 KiB
Python

from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import RedirectResponse, JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from src.models import TokenResponse, UserInfo
from src.service import AuthService, UserService
from src.config import get_settings
from src.database import get_db
# Initialize HTTPBearer security dependency
bearer_scheme = HTTPBearer()
# Get settings
settings = get_settings()
class AuthController:
"""
Controller for handling authentication logic.
"""
@staticmethod
def read_root():
"""
Root endpoint providing basic information and documentation link.
Returns:
dict: A welcome message and link to the documentation.
"""
return {
"message": (
"Welcome to the Keycloak authentication system. "
"Use the /api/login endpoint to authenticate and /api/auth/me "
"endpoint to access the authenticated user information."
),
"documentation": "/docs",
}
@staticmethod
async def login(keycode: str, request: Request, db: AsyncSession) -> RedirectResponse:
"""
Authenticate user, provision in database if needed, set HTTP-only cookie,
and redirect to frontend.
Args:
keycode (str): The authorization code from Keycloak.
request (Request): The FastAPI request object.
db (AsyncSession): Database session for user provisioning.
Raises:
HTTPException: If the authentication fails.
Returns:
RedirectResponse: Redirects to frontend with cookie set.
"""
# Authenticate the user using the AuthService
token_response = AuthService.authenticate_user(keycode, request)
access_token = token_response.get("access_token")
if not access_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication failed",
)
# Provision user in database (JIT provisioning)
# This creates the user if they don't exist
user_id, is_new_user = await UserService.provision_user_on_login(access_token, db)
# Create redirect response to frontend
response = RedirectResponse(
url=f"{settings.frontend_url}/dashboard",
status_code=status.HTTP_302_FOUND
)
# Set HTTP-only cookie with the access token
response.set_cookie(
key=settings.cookie_name,
value=access_token,
httponly=True,
secure=settings.cookie_secure,
samesite=settings.cookie_samesite,
max_age=settings.cookie_max_age,
domain=settings.cookie_domain,
path="/",
)
return response
@staticmethod
def get_current_user(request: Request) -> UserInfo:
"""
Get the current authenticated user from the session cookie.
Args:
request (Request): The FastAPI request object.
Raises:
HTTPException: If no valid session cookie exists.
Returns:
UserInfo: Information about the authenticated user.
"""
# Extract the token from the cookie
token = request.cookies.get(settings.cookie_name)
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify the token and get user information
user_info = AuthService.verify_token(token)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired session",
headers={"WWW-Authenticate": "Bearer"},
)
return user_info
@staticmethod
def logout() -> JSONResponse:
"""
Logout the user by clearing the authentication cookie and returning
the Keycloak logout URL for full session termination.
Returns:
JSONResponse: Contains the Keycloak logout URL and clears the cookie.
"""
# Build Keycloak logout URL
keycloak_logout_url = (
f"{settings.keycloak_external_url}realms/{settings.keycloak_realm}"
f"/protocol/openid-connect/logout"
f"?client_id={settings.keycloak_client_id}"
f"&post_logout_redirect_uri={settings.frontend_url}"
)
response = JSONResponse(
content={
"message": "Successfully logged out",
"logout_url": keycloak_logout_url
},
status_code=status.HTTP_200_OK
)
# Clear the authentication cookie
response.delete_cookie(
key=settings.cookie_name,
path="/",
domain=settings.cookie_domain,
)
return response
@staticmethod
def protected_endpoint(
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
) -> UserInfo:
"""
Access a protected resource that requires valid token authentication.
Args:
credentials (HTTPAuthorizationCredentials): Bearer token provided
via HTTP Authorization header.
Raises:
HTTPException: If the token is invalid or not provided.
Returns:
UserInfo: Information about the authenticated user.
"""
# Extract the bearer token from the provided credentials
token = credentials.credentials
# Verify the token and get user information
user_info = AuthService.verify_token(token)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
return user_info