initial commit

This commit is contained in:
gulimabr
2025-11-28 12:33:37 -03:00
commit 5da54393ff
42 changed files with 3251 additions and 0 deletions

0
backend/src/__init__.py Normal file
View File

49
backend/src/config.py Normal file
View File

@@ -0,0 +1,49 @@
from pydantic_settings import BaseSettings
from pydantic import Field
from keycloak import KeycloakOpenID
class Settings(BaseSettings):
# Keycloak settings
keycloak_server_url: str = Field(..., env="KEYCLOAK_SERVER_URL")
keycloak_external_url: str = Field(..., env="KEYCLOAK_EXTERNAL_URL")
keycloak_realm: str = Field(..., env="KEYCLOAK_REALM")
keycloak_client_id: str = Field(..., env="KEYCLOAK_CLIENT_ID")
keycloak_client_secret: str = Field(..., env="KEYCLOAK_CLIENT_SECRET")
# Frontend settings
frontend_url: str = Field(default="http://localhost:3000", env="FRONTEND_URL")
# Cookie settings
cookie_secure: bool = Field(default=False, env="COOKIE_SECURE")
cookie_samesite: str = Field(default="lax", env="COOKIE_SAMESITE")
cookie_domain: str | None = Field(default=None, env="COOKIE_DOMAIN")
cookie_max_age: int = Field(default=3600, env="COOKIE_MAX_AGE")
cookie_name: str = Field(default="access_token", env="COOKIE_NAME")
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
keycloak_openid = KeycloakOpenID(
server_url=settings.keycloak_server_url,
realm_name=settings.keycloak_realm,
client_id=settings.keycloak_client_id,
client_secret_key=settings.keycloak_client_secret,
verify=False
)
def get_openid_config():
return keycloak_openid.well_known()
def get_openid():
return keycloak_openid
def get_settings():
return settings

169
backend/src/controller.py Normal file
View File

@@ -0,0 +1,169 @@
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import RedirectResponse, JSONResponse
from src.models import TokenResponse, UserInfo
from src.service import AuthService
from src.config import get_settings
# Initialize HTTPBearer security dependency
bearer_scheme = HTTPBearer()
# Get settings
settings = get_settings()
class AuthController:
"""
Controller for handling authentication logic.
"""
@staticmethod
def read_root():
"""
Root endpoint providing basic information and documentation link.
Returns:
dict: A welcome message and link to the documentation.
"""
return {
"message": (
"Welcome to the Keycloak authentication system. "
"Use the /api/login endpoint to authenticate and /api/auth/me "
"endpoint to access the authenticated user information."
),
"documentation": "/docs",
}
@staticmethod
def login(keycode: str, request: Request) -> RedirectResponse:
"""
Authenticate user, set HTTP-only cookie, and redirect to frontend.
Args:
keycode (str): The authorization code from Keycloak.
request (Request): The FastAPI request object.
Raises:
HTTPException: If the authentication fails.
Returns:
RedirectResponse: Redirects to frontend with cookie set.
"""
# Authenticate the user using the AuthService
access_token = AuthService.authenticate_user(keycode, request)
if not access_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication failed",
)
# Create redirect response to frontend
response = RedirectResponse(
url=f"{settings.frontend_url}/dashboard",
status_code=status.HTTP_302_FOUND
)
# Set HTTP-only cookie with the access token
response.set_cookie(
key=settings.cookie_name,
value=access_token,
httponly=True,
secure=settings.cookie_secure,
samesite=settings.cookie_samesite,
max_age=settings.cookie_max_age,
domain=settings.cookie_domain,
path="/",
)
return response
@staticmethod
def get_current_user(request: Request) -> UserInfo:
"""
Get the current authenticated user from the session cookie.
Args:
request (Request): The FastAPI request object.
Raises:
HTTPException: If no valid session cookie exists.
Returns:
UserInfo: Information about the authenticated user.
"""
# Extract the token from the cookie
token = request.cookies.get(settings.cookie_name)
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify the token and get user information
user_info = AuthService.verify_token(token)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired session",
headers={"WWW-Authenticate": "Bearer"},
)
return user_info
@staticmethod
def logout() -> JSONResponse:
"""
Logout the user by clearing the authentication cookie.
Returns:
JSONResponse: Success message with cookie cleared.
"""
response = JSONResponse(
content={"message": "Successfully logged out"},
status_code=status.HTTP_200_OK
)
# Clear the authentication cookie
response.delete_cookie(
key=settings.cookie_name,
path="/",
domain=settings.cookie_domain,
)
return response
@staticmethod
def protected_endpoint(
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
) -> UserInfo:
"""
Access a protected resource that requires valid token authentication.
Args:
credentials (HTTPAuthorizationCredentials): Bearer token provided
via HTTP Authorization header.
Raises:
HTTPException: If the token is invalid or not provided.
Returns:
UserInfo: Information about the authenticated user.
"""
# Extract the bearer token from the provided credentials
token = credentials.credentials
# Verify the token and get user information
user_info = AuthService.verify_token(token)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
return user_info

118
backend/src/main.py Normal file
View File

@@ -0,0 +1,118 @@
from fastapi import FastAPI, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import RedirectResponse
from src.models import TokenResponse, UserInfo
from src.controller import AuthController
from src.config import get_openid, get_settings
# Initialize the FastAPI app
app = FastAPI(title="Keycloak Auth API", version="1.0.0")
# Get settings
settings = get_settings()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=[settings.frontend_url],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize the HTTPBearer scheme for authentication
bearer_scheme = HTTPBearer()
# Configure client
keycloak_openid = get_openid()
# Define the root endpoint
@app.get("/api")
async def read_root():
"""
Root endpoint that provides a welcome message and documentation link.
"""
return AuthController.read_root()
# Define the login endpoint
@app.get("/api/login", response_class=RedirectResponse)
async def login(request: Request):
"""
Login endpoint to authenticate the user and return an access token.
Returns:
RedirectResponse: Contains the redirect URL upon successful authentication.
"""
# Build the callback URI using the frontend URL (accessible from browser)
# This ensures the redirect works correctly through nginx proxy
redirect_uri = f"{settings.frontend_url}/api/callback"
# Construct the authorization URL with external Keycloak URL
auth_url = (
f"{settings.keycloak_external_url}realms/{settings.keycloak_realm}"
f"/protocol/openid-connect/auth"
f"?client_id={settings.keycloak_client_id}"
f"&response_type=code"
f"&redirect_uri={redirect_uri}"
f"&scope=openid%20profile%20email"
)
return RedirectResponse(auth_url)
# Define the callback endpoint
@app.get("/api/callback", include_in_schema=False)
async def callback(request: Request):
"""
OAuth callback endpoint that exchanges the authorization code for a token
and sets it as an HTTP-only cookie.
"""
# Extract the code from the URL
keycode = request.query_params.get('code')
return AuthController.login(str(keycode), request)
# Define the auth/me endpoint to get current user from cookie
@app.get("/api/auth/me", response_model=UserInfo)
async def get_current_user(request: Request):
"""
Get the current authenticated user from the session cookie.
Returns:
UserInfo: Information about the authenticated user.
"""
return AuthController.get_current_user(request)
# Define the logout endpoint
@app.post("/api/auth/logout")
async def logout(request: Request):
"""
Logout endpoint that clears the authentication cookie.
Returns:
dict: Success message.
"""
return AuthController.logout()
# Define the protected endpoint (kept for API token-based access)
@app.get("/api/protected", response_model=UserInfo)
async def protected_endpoint(
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
):
"""
Protected endpoint that requires a valid token for access.
Args:
credentials (HTTPAuthorizationCredentials):
Bearer token provided via HTTP Authorization header.
Returns:
UserInfo: Information about the authenticated user.
"""
return AuthController.protected_endpoint(credentials)

18
backend/src/models.py Normal file
View File

@@ -0,0 +1,18 @@
from typing import Optional
from pydantic import BaseModel, SecretStr
class TokenRequest(BaseModel):
username: str
password: SecretStr
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
class UserInfo(BaseModel):
preferred_username: str
email: Optional[str] = None
full_name: Optional[str] = None

91
backend/src/service.py Normal file
View File

@@ -0,0 +1,91 @@
from fastapi import HTTPException, status, Request
from keycloak.exceptions import KeycloakAuthenticationError, KeycloakPostError
from keycloak import KeycloakOpenID
from src.config import get_settings
from src.models import UserInfo
import logging
logger = logging.getLogger(__name__)
settings = get_settings()
# Create a fresh KeycloakOpenID instance for token exchange
def get_keycloak_openid():
return KeycloakOpenID(
server_url=settings.keycloak_server_url,
realm_name=settings.keycloak_realm,
client_id=settings.keycloak_client_id,
client_secret_key=settings.keycloak_client_secret,
verify=False
)
class AuthService:
@staticmethod
def authenticate_user(keycode: str, request: Request) -> str:
"""
Authenticate the user using Keycloak and return an access token.
"""
try:
# Use the same redirect_uri that was used in the login endpoint
redirect_uri = f"{settings.frontend_url}/api/callback"
logger.info(f"=== Token Exchange Debug ===")
logger.info(f"Keycloak Server URL: {settings.keycloak_server_url}")
logger.info(f"Realm: {settings.keycloak_realm}")
logger.info(f"Client ID: {settings.keycloak_client_id}")
logger.info(f"Client Secret (first 5 chars): {settings.keycloak_client_secret[:5]}...")
logger.info(f"Redirect URI: {redirect_uri}")
logger.info(f"Auth Code (first 10 chars): {keycode[:10]}...")
# Get fresh KeycloakOpenID instance
keycloak_openid = get_keycloak_openid()
token = keycloak_openid.token(
grant_type='authorization_code',
code=keycode,
redirect_uri=redirect_uri,
)
logger.info("Token exchange successful")
return token["access_token"]
except KeycloakAuthenticationError as exc:
logger.error(f"KeycloakAuthenticationError: {exc}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid Login: {str(exc)}",
) from exc
except KeycloakPostError as exc:
logger.error(f"KeycloakPostError: {exc}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid Grant: {str(exc)}",
) from exc
except Exception as exc:
logger.error(f"Unexpected error during token exchange: {exc}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Token exchange failed: {str(exc)}",
) from exc
@staticmethod
def verify_token(token: str) -> UserInfo:
"""
Verify the given token and return user information.
"""
try:
keycloak_openid = get_keycloak_openid()
user_info = keycloak_openid.userinfo(token)
print(user_info)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"
)
return UserInfo(
preferred_username=user_info["preferred_username"],
email=user_info.get("email"),
full_name=user_info.get("name"),
)
except KeycloakAuthenticationError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
) from exc