Files
periodic-table/backend/src/service.py

157 lines
5.7 KiB
Python

from fastapi import HTTPException, status, Request
from keycloak.exceptions import KeycloakAuthenticationError, KeycloakPostError
from keycloak import KeycloakOpenID
from sqlalchemy.ext.asyncio import AsyncSession
from src.config import get_settings
from src.models import UserInfo
from src.repositories import UserRepository
import logging
logger = logging.getLogger(__name__)
settings = get_settings()
# Create a fresh KeycloakOpenID instance for token exchange
def get_keycloak_openid():
return KeycloakOpenID(
server_url=settings.keycloak_server_url,
realm_name=settings.keycloak_realm,
client_id=settings.keycloak_client_id,
client_secret_key=settings.keycloak_client_secret,
verify=False
)
class AuthService:
@staticmethod
def authenticate_user(keycode: str, request: Request) -> dict:
"""
Authenticate the user using Keycloak and return the full token response.
Returns the full token dict to allow access to the access_token.
"""
try:
# Use the same redirect_uri that was used in the login endpoint
redirect_uri = f"{settings.frontend_url}/api/callback"
logger.info(f"=== Token Exchange Debug ===")
logger.info(f"Keycloak Server URL: {settings.keycloak_server_url}")
logger.info(f"Realm: {settings.keycloak_realm}")
logger.info(f"Client ID: {settings.keycloak_client_id}")
logger.info(f"Client Secret (first 5 chars): {settings.keycloak_client_secret[:5]}...")
logger.info(f"Redirect URI: {redirect_uri}")
logger.info(f"Auth Code (first 10 chars): {keycode[:10]}...")
# Get fresh KeycloakOpenID instance
keycloak_openid = get_keycloak_openid()
token = keycloak_openid.token(
grant_type='authorization_code',
code=keycode,
redirect_uri=redirect_uri,
)
logger.info("Token exchange successful")
return token
except KeycloakAuthenticationError as exc:
logger.error(f"KeycloakAuthenticationError: {exc}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid Login: {str(exc)}",
) from exc
except KeycloakPostError as exc:
logger.error(f"KeycloakPostError: {exc}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid Grant: {str(exc)}",
) from exc
except Exception as exc:
logger.error(f"Unexpected error during token exchange: {exc}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Token exchange failed: {str(exc)}",
) from exc
@staticmethod
def verify_token(token: str) -> UserInfo:
"""
Verify the given token and return user information.
"""
try:
keycloak_openid = get_keycloak_openid()
user_info = keycloak_openid.userinfo(token)
print(user_info)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
)
return UserInfo(
sub=user_info.get("sub"),
preferred_username=user_info["preferred_username"],
email=user_info.get("email"),
full_name=user_info.get("name"),
)
except KeycloakAuthenticationError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
) from exc
@staticmethod
def decode_token(token: str) -> dict:
"""
Decode the access token to extract claims without full verification.
Used to get the 'sub' claim for user provisioning.
"""
try:
keycloak_openid = get_keycloak_openid()
# Decode token - this validates the signature
token_info = keycloak_openid.decode_token(
token,
validate=True
)
return token_info
except Exception as exc:
logger.error(f"Error decoding token: {exc}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not decode token",
) from exc
class UserService:
"""Service for user-related operations."""
@staticmethod
async def provision_user_on_login(
token: str,
db: AsyncSession
) -> tuple[int, bool]:
"""
Provision a user in the database on first login (JIT provisioning).
Args:
token: The access token from Keycloak
db: Database session
Returns:
Tuple of (user_id, is_new_user)
"""
# Decode the token to get the 'sub' claim
token_info = AuthService.decode_token(token)
sub = token_info.get("sub")
if not sub:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Token does not contain 'sub' claim"
)
# Get or create the user
user_repo = UserRepository(db)
user, created = await user_repo.get_or_create_user(sub)
if created:
logger.info(f"New user provisioned: {sub} -> user_id: {user.id}")
else:
logger.debug(f"Existing user logged in: {sub} -> user_id: {user.id}")
return user.id, created