from contextlib import asynccontextmanager from typing import List, Optional from fastapi import FastAPI, Depends, Request, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.responses import RedirectResponse from sqlalchemy.ext.asyncio import AsyncSession from src.models import ( TokenResponse, UserInfo, GroupResponse, TagResponse, RequirementResponse, PriorityResponse, RequirementCreateRequest, RequirementUpdateRequest, RequirementHistoryResponse, ProjectResponse, ProjectCreateRequest, ProjectUpdateRequest, ProjectMemberRequest, ValidationStatusResponse, ValidationHistoryResponse, ValidationCreateRequest, RelationshipTypeResponse, RelationshipTypeCreateRequest, RelationshipTypeUpdateRequest, RequirementLinkResponse, RequirementLinkCreateRequest, RequirementSearchResult, RequirementLinkHistoryResponse, RequirementGroupHistoryResponse, CurrentRequirementGroupResponse, RoleResponse, ProjectMemberResponse, UserRoleUpdateRequest, ROLE_DISPLAY_NAMES, CommentResponse, CommentReplyResponse, CommentCreateRequest, ReplyCreateRequest, RequirementStatusResponse, DeletedRequirementResponse, UserCreateRequest, UserCreateResponse, SystemUserResponse, SystemProjectResponse, SystemProjectMemberResponse, AssignUserToProjectRequest, SystemUserCreateRequest ) from src.controller import AuthController from src.config import get_openid, get_settings from src.database import init_db, close_db, get_db from src.repositories import ( RoleRepository, GroupRepository, TagRepository, RequirementRepository, PriorityRepository, ProjectRepository, ValidationStatusRepository, ValidationRepository, RelationshipTypeRepository, RequirementLinkRepository, CommentRepository, ReplyRepository, RequirementStatusRepository, UserRepository ) from src.service import KeycloakAdminService import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Get settings settings = get_settings() @asynccontextmanager async def lifespan(app: FastAPI): """ Application lifespan manager. Handles startup and shutdown events. """ # Startup logger.info("Starting up application...") logger.info("Initializing database...") await init_db() logger.info("Database initialized successfully") # Ensure default roles exist from src.database import AsyncSessionLocal async with AsyncSessionLocal() as session: role_repo = RoleRepository(session) await role_repo.ensure_default_roles_exist() await session.commit() logger.info("Default roles ensured") # Ensure default requirement statuses exist async with AsyncSessionLocal() as session: req_status_repo = RequirementStatusRepository(session) await req_status_repo.ensure_default_statuses_exist() await session.commit() logger.info("Default requirement statuses ensured") # Ensure default validation statuses exist async with AsyncSessionLocal() as session: await session.execute( __import__('sqlalchemy').text( """ INSERT INTO validation_statuses (id, status_name) VALUES (1, 'Approved'), (2, 'Denied'), (3, 'Partial'), (4, 'Not Validated') ON CONFLICT (id) DO NOTHING """ ) ) await session.commit() logger.info("Default validation statuses ensured") # Provision super admin if configured if settings.super_admin_username and settings.super_admin_email and settings.super_admin_password: logger.info(f"Super admin configuration found, provisioning user: {settings.super_admin_username}") try: # Check if super admin already exists in local DB async with AsyncSessionLocal() as session: user_repo = UserRepository(session) existing_user = await user_repo.get_by_username(settings.super_admin_username) if existing_user: logger.info(f"Super admin already exists in database: {existing_user.username} (id: {existing_user.id})") # Ensure they have super_admin role (role_id=6) if existing_user.role_id != 6: logger.info(f"Updating existing super admin to role_id=6") await user_repo.update_role(existing_user.id, 6) await session.commit() else: # Provision in Keycloak first keycloak_sub = await KeycloakAdminService.provision_super_admin( username=settings.super_admin_username, email=settings.super_admin_email, password=settings.super_admin_password ) # Create in local database with super_admin role (role_id=6) new_user = await user_repo.create( sub=keycloak_sub, role_id=6, # super_admin role username=settings.super_admin_username, full_name="Super Admin" ) await session.commit() logger.info(f"Super admin provisioned successfully: {new_user.username} (id: {new_user.id})") except Exception as e: logger.error(f"Failed to provision super admin: {e}") # Don't fail startup if super admin provisioning fails else: logger.info("No super admin configuration found, skipping provisioning") yield # Shutdown logger.info("Shutting down application...") await close_db() logger.info("Database connection closed") # Initialize the FastAPI app app = FastAPI( title="Keycloak Auth API", version="1.0.0", lifespan=lifespan ) # Configure CORS app.add_middleware( CORSMiddleware, allow_origins=[settings.frontend_url], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize the HTTPBearer scheme for authentication bearer_scheme = HTTPBearer() # Configure client keycloak_openid = get_openid() # Define the root endpoint @app.get("/api") async def read_root(): """ Root endpoint that provides a welcome message and documentation link. """ return AuthController.read_root() # Define the login endpoint @app.get("/api/login", response_class=RedirectResponse) async def login(request: Request): """ Login endpoint to authenticate the user and return an access token. Returns: RedirectResponse: Contains the redirect URL upon successful authentication. """ # Build the callback URI using the frontend URL (accessible from browser) # This ensures the redirect works correctly through nginx proxy redirect_uri = f"{settings.frontend_url}/api/callback" # Construct the authorization URL with external Keycloak URL auth_url = ( f"{settings.keycloak_external_url}realms/{settings.keycloak_realm}" f"/protocol/openid-connect/auth" f"?client_id={settings.keycloak_client_id}" f"&response_type=code" f"&redirect_uri={redirect_uri}" f"&scope=openid%20profile%20email" ) return RedirectResponse(auth_url) # Define the callback endpoint @app.get("/api/callback", include_in_schema=False) async def callback(request: Request, db: AsyncSession = Depends(get_db)): """ OAuth callback endpoint that exchanges the authorization code for a token, provisions the user in the database if needed, and sets it as an HTTP-only cookie. """ # Extract the code from the URL keycode = request.query_params.get('code') return await AuthController.login(str(keycode), request, db) # Define the auth/me endpoint to get current user from cookie @app.get("/api/auth/me", response_model=UserInfo) async def get_current_user(request: Request, db: AsyncSession = Depends(get_db)): """ Get the current authenticated user from the session cookie. Includes role information from the database. Returns: UserInfo: Information about the authenticated user including role. """ user_info = AuthController.get_current_user(request) # Fetch role information from database from src.repositories import UserRepository user_repo = UserRepository(db) db_user = await user_repo.get_by_sub(user_info.sub) if db_user: user_info.db_user_id = db_user.id user_info.role_id = db_user.role_id user_info.role = db_user.role.role_name if db_user.role else None return user_info # Define the logout endpoint @app.post("/api/auth/logout") async def logout(request: Request): """ Logout endpoint that clears the authentication cookie. Returns: dict: Success message. """ return AuthController.logout() # Define the token refresh endpoint @app.post("/api/auth/refresh") async def refresh_token(request: Request): """ Silently refresh the access token using the refresh token cookie. This should be called by the frontend before the access token expires. Returns: dict: Success status and new expiration time. """ return AuthController.refresh_token(request) # Define the protected endpoint (kept for API token-based access) @app.get("/api/protected", response_model=UserInfo) async def protected_endpoint( credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), ): """ Protected endpoint that requires a valid token for access. Args: credentials (HTTPAuthorizationCredentials): Bearer token provided via HTTP Authorization header. Returns: UserInfo: Information about the authenticated user. """ return AuthController.protected_endpoint(credentials) # =========================================== # Groups Endpoints # =========================================== @app.get("/api/groups", response_model=List[GroupResponse]) async def get_groups(db: AsyncSession = Depends(get_db)): """ Get all groups. Returns: List of all groups with their names and colors. """ group_repo = GroupRepository(db) groups = await group_repo.get_all() return [GroupResponse.model_validate(g) for g in groups] # =========================================== # Tags Endpoints # =========================================== @app.get("/api/tags", response_model=List[TagResponse]) async def get_tags(db: AsyncSession = Depends(get_db)): """ Get all tags. Returns: List of all tags with their codes and descriptions. """ tag_repo = TagRepository(db) tags = await tag_repo.get_all() return [TagResponse.model_validate(t) for t in tags] @app.get("/api/tags/{tag_id}", response_model=TagResponse) async def get_tag(tag_id: int, db: AsyncSession = Depends(get_db)): """ Get a specific tag by ID. Args: tag_id: The tag ID Returns: The tag if found. """ tag_repo = TagRepository(db) tag = await tag_repo.get_by_id(tag_id) if not tag: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Tag with id {tag_id} not found" ) return TagResponse.model_validate(tag) # =========================================== # Priorities Endpoints # =========================================== @app.get("/api/priorities", response_model=List[PriorityResponse]) async def get_priorities(db: AsyncSession = Depends(get_db)): """ Get all priorities. Returns: List of all priorities ordered by priority_num. """ priority_repo = PriorityRepository(db) priorities = await priority_repo.get_all() return [PriorityResponse.model_validate(p) for p in priorities] # =========================================== # Requirement Statuses Endpoints # =========================================== @app.get("/api/requirement-statuses", response_model=List[RequirementStatusResponse]) async def get_requirement_statuses(db: AsyncSession = Depends(get_db)): """ Get all requirement lifecycle statuses (Draft, Regular, etc.). Returns: List of all requirement statuses. """ status_repo = RequirementStatusRepository(db) statuses = await status_repo.get_all() return [RequirementStatusResponse.model_validate(s) for s in statuses] # =========================================== # Projects Endpoints # =========================================== async def _get_current_user_db(request: Request, db: AsyncSession): """Helper to get the current authenticated user from the database.""" user_info = AuthController.get_current_user(request) from src.repositories import UserRepository user_repo = UserRepository(db) user = await user_repo.get_by_sub(user_info.sub) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found in database" ) return user def _require_role(user, allowed_role_ids: List[int], action: str = "perform this action"): """ Helper to check if user has one of the allowed roles. Args: user: The database user object allowed_role_ids: List of role IDs that are permitted (e.g., [1, 3] for admin and user) action: Description of the action for error message Raises: HTTPException: 403 Forbidden if user's role is not in allowed list """ if user.role_id not in allowed_role_ids: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Your role does not have permission to {action}" ) def _get_display_name(user) -> str: """ Get the best display name for a user. Falls back in order: full_name -> username -> sub. Args: user: The database user object Returns: The best available display name for the user """ return user.full_name or user.username or user.sub async def _verify_project_membership(project_id: int, user_id: int, db: AsyncSession): """Helper to verify user is a member of a project.""" project_repo = ProjectRepository(db) # Check if project exists project = await project_repo.get_by_id(project_id) if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id {project_id} not found" ) # Check if user is a member is_member = await project_repo.is_member(project_id, user_id) if not is_member: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You are not a member of this project" ) return project @app.get("/api/projects", response_model=List[ProjectResponse]) async def get_my_projects( request: Request, db: AsyncSession = Depends(get_db) ): """ Get all projects the authenticated user is a member of. Returns: List of projects the user belongs to. """ user = await _get_current_user_db(request, db) project_repo = ProjectRepository(db) projects = await project_repo.get_by_user_id(user.id) return [ProjectResponse.model_validate(p) for p in projects] @app.get("/api/projects/{project_id}", response_model=ProjectResponse) async def get_project( project_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get a specific project by ID. User must be a member of the project. Args: project_id: The project ID Returns: The project if found and user is a member. """ user = await _get_current_user_db(request, db) project = await _verify_project_membership(project_id, user.id, db) return ProjectResponse.model_validate(project) @app.post("/api/projects", response_model=ProjectResponse, status_code=status.HTTP_201_CREATED) async def create_project( request: Request, project_data: ProjectCreateRequest, db: AsyncSession = Depends(get_db) ): """ Create a new project. The creating user will automatically be added as a member. Args: project_data: The project data Returns: The created project. """ user = await _get_current_user_db(request, db) project_repo = ProjectRepository(db) project = await project_repo.create( project_name=project_data.project_name, project_desc=project_data.project_desc, creator_id=user.id, ) await db.commit() return ProjectResponse.model_validate(project) @app.put("/api/projects/{project_id}", response_model=ProjectResponse) async def update_project( project_id: int, request: Request, project_data: ProjectUpdateRequest, db: AsyncSession = Depends(get_db) ): """ Update an existing project. User must be a member of the project. Args: project_id: The project ID to update project_data: The updated project data Returns: The updated project. """ user = await _get_current_user_db(request, db) await _verify_project_membership(project_id, user.id, db) project_repo = ProjectRepository(db) project = await project_repo.update( project_id=project_id, project_name=project_data.project_name, project_desc=project_data.project_desc, ) await db.commit() return ProjectResponse.model_validate(project) @app.delete("/api/projects/{project_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_project( project_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Delete a project. User must be a member of the project. Args: project_id: The project ID to delete """ user = await _get_current_user_db(request, db) await _verify_project_membership(project_id, user.id, db) project_repo = ProjectRepository(db) await project_repo.delete(project_id) await db.commit() @app.post("/api/projects/{project_id}/members", status_code=status.HTTP_201_CREATED) async def add_project_member( project_id: int, request: Request, member_data: ProjectMemberRequest, db: AsyncSession = Depends(get_db) ): """ Add a member to a project. User must be a member of the project. Args: project_id: The project ID member_data: The user to add Returns: Success message. """ user = await _get_current_user_db(request, db) await _verify_project_membership(project_id, user.id, db) project_repo = ProjectRepository(db) added = await project_repo.add_member(project_id, member_data.user_id) if not added: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="User is already a member of this project" ) await db.commit() return {"message": "Member added successfully"} @app.delete("/api/projects/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def remove_project_member( project_id: int, user_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Remove a member from a project. User must be a member of the project. Args: project_id: The project ID user_id: The user ID to remove """ current_user = await _get_current_user_db(request, db) await _verify_project_membership(project_id, current_user.id, db) project_repo = ProjectRepository(db) removed = await project_repo.remove_member(project_id, user_id) if not removed: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User is not a member of this project" ) await db.commit() # =========================================== # Admin Endpoints (Role Management, Project Admin) # =========================================== @app.get("/api/roles", response_model=List[RoleResponse]) async def get_all_roles( request: Request, db: AsyncSession = Depends(get_db) ): """ Get all available roles with their display names. Returns: List of roles with id, role_name, and display_name. """ # Ensure user is authenticated await _get_current_user_db(request, db) role_repo = RoleRepository(db) roles = await role_repo.get_all() return [RoleResponse.from_role(r) for r in roles] @app.get("/api/projects/{project_id}/members", response_model=List[ProjectMemberResponse]) async def get_project_members( project_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get all members of a project with their role information. User must be a member of the project. Args: project_id: The project ID Returns: List of project members with role info. """ user = await _get_current_user_db(request, db) await _verify_project_membership(project_id, user.id, db) project_repo = ProjectRepository(db) members = await project_repo.get_members(project_id) return [ ProjectMemberResponse( id=member.id, sub=_get_display_name(member), role_id=member.role_id, role_name=member.role.role_name if member.role else "unknown", role_display_name=ROLE_DISPLAY_NAMES.get(member.role.role_name, member.role.role_name.title()) if member.role else "Unknown", created_at=member.created_at ) for member in members ] @app.put("/api/projects/{project_id}/members/{user_id}/role", response_model=ProjectMemberResponse) async def update_member_role( project_id: int, user_id: int, request: Request, role_data: UserRoleUpdateRequest, db: AsyncSession = Depends(get_db) ): """ Update a project member's role. Only project admins (role_id=3) can update roles. Admin cannot demote themselves. Args: project_id: The project ID user_id: The user ID to update role_data: The new role ID Returns: The updated member info. """ current_user = await _get_current_user_db(request, db) # Only admins (role_id=3) can update roles _require_role(current_user, [3], "update member roles") await _verify_project_membership(project_id, current_user.id, db) # Check target user is a member of the project project_repo = ProjectRepository(db) if not await project_repo.is_member(project_id, user_id): raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User is not a member of this project" ) # Prevent self-demotion if current_user.id == user_id and role_data.role_id != 3: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="You cannot demote yourself. Ask another admin to change your role." ) # Verify role exists role_repo = RoleRepository(db) role = await role_repo.get_by_id(role_data.role_id) if not role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid role id {role_data.role_id}" ) # Update the user's role from src.repositories import UserRepository user_repo = UserRepository(db) updated_user = await user_repo.update_role(user_id, role_data.role_id) if not updated_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id {user_id} not found" ) await db.commit() return ProjectMemberResponse( id=updated_user.id, sub=_get_display_name(updated_user), role_id=updated_user.role_id, role_name=role.role_name, role_display_name=ROLE_DISPLAY_NAMES.get(role.role_name, role.role_name.title()), created_at=updated_user.created_at ) @app.post("/api/projects/{project_id}/users", response_model=UserCreateResponse, status_code=status.HTTP_201_CREATED) async def create_project_user( project_id: int, request: Request, user_data: UserCreateRequest, db: AsyncSession = Depends(get_db) ): """ Create a new user directly from the admin panel and add them to the project. Only project admins (role_id=3) can create users. The user will be created in Keycloak with a temporary password that must be changed on first login. Args: project_id: The project ID to add the user to user_data: The user data (username, email, password, first_name, last_name, role_id) Returns: The created user info. """ current_user = await _get_current_user_db(request, db) # Only admins (role_id=3) can create users _require_role(current_user, [3], "create users") await _verify_project_membership(project_id, current_user.id, db) # Validate role exists role_repo = RoleRepository(db) role = await role_repo.get_by_id(user_data.role_id) if not role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid role id {user_data.role_id}" ) # Create user in Keycloak keycloak_sub = await KeycloakAdminService.create_user( username=user_data.username, email=user_data.email, password=user_data.password, first_name=user_data.first_name, last_name=user_data.last_name ) # Build full name from first_name and last_name full_name = None if user_data.first_name or user_data.last_name: full_name = f"{user_data.first_name or ''} {user_data.last_name or ''}".strip() # Create user in local database user_repo = UserRepository(db) new_user = await user_repo.create( sub=keycloak_sub, role_id=user_data.role_id, username=user_data.username, full_name=full_name ) # Add user to the project project_repo = ProjectRepository(db) await project_repo.add_member(project_id, new_user.id) await db.commit() logger.info(f"Admin {current_user.id} created new user {new_user.id} ({user_data.username}) for project {project_id}") return UserCreateResponse( id=new_user.id, username=user_data.username, email=user_data.email, full_name=full_name, role_id=user_data.role_id, role_name=role.role_name, role_display_name=ROLE_DISPLAY_NAMES.get(role.role_name, role.role_name.title()) ) @app.put("/api/projects/{project_id}/relationship-types/{type_id}", response_model=RelationshipTypeResponse) async def update_relationship_type( project_id: int, type_id: int, request: Request, type_data: RelationshipTypeUpdateRequest, db: AsyncSession = Depends(get_db) ): """ Update a relationship type. Only project admins (role_id=3) can update relationship types. Args: project_id: The project ID type_id: The relationship type ID to update type_data: The updated relationship type data Returns: The updated relationship type. """ user = await _get_current_user_db(request, db) # Only admins (role_id=3) can update relationship types _require_role(user, [3], "update relationship types") await _verify_project_membership(project_id, user.id, db) rel_type_repo = RelationshipTypeRepository(db) # Check if relationship type exists and belongs to the project existing_type = await rel_type_repo.get_by_id(type_id) if not existing_type: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Relationship type with id {type_id} not found" ) if existing_type.project_id != project_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Relationship type does not belong to this project" ) updated_type = await rel_type_repo.update( relationship_type_id=type_id, type_name=type_data.type_name, type_description=type_data.type_description, inverse_type_name=type_data.inverse_type_name ) await db.commit() return RelationshipTypeResponse.model_validate(updated_type) @app.delete("/api/projects/{project_id}/relationship-types/{type_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_relationship_type( project_id: int, type_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Delete a relationship type. Only project admins (role_id=3) can delete relationship types. This will also delete all links using this relationship type. Args: project_id: The project ID type_id: The relationship type ID to delete """ user = await _get_current_user_db(request, db) # Only admins (role_id=3) can delete relationship types _require_role(user, [3], "delete relationship types") await _verify_project_membership(project_id, user.id, db) rel_type_repo = RelationshipTypeRepository(db) # Check if relationship type exists and belongs to the project existing_type = await rel_type_repo.get_by_id(type_id) if not existing_type: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Relationship type with id {type_id} not found" ) if existing_type.project_id != project_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Relationship type does not belong to this project" ) deleted = await rel_type_repo.delete(type_id) if not deleted: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Relationship type with id {type_id} not found" ) await db.commit() # =========================================== # Requirements Endpoints # =========================================== def _build_requirement_response(req) -> RequirementResponse: """Helper function to build RequirementResponse from a Requirement model.""" # Determine validation status from latest validation validation_status = "Not Validated" validated_by = None validated_at = None validation_version = None if req.validations: # Get the latest validation latest_validation = max(req.validations, key=lambda v: v.created_at or req.created_at) validation_status = latest_validation.status.status_name if latest_validation.status else "Not Validated" # Try to get display name from user relationship if latest_validation.user: validated_by = _get_display_name(latest_validation.user) validated_at = latest_validation.created_at validation_version = latest_validation.req_version_snapshot # Get author (creator) display name author_username = None if req.user: author_username = _get_display_name(req.user) # Get last editor display name last_editor_username = None if req.last_editor: last_editor_username = _get_display_name(req.last_editor) # Get requirement lifecycle status status_response = None if req.status: status_response = RequirementStatusResponse.model_validate(req.status) return RequirementResponse( id=req.id, project_id=req.project_id, req_name=req.req_name, req_desc=req.req_desc, version=req.version, created_at=req.created_at, updated_at=req.updated_at, tag=TagResponse.model_validate(req.tag), priority=req.priority if req.priority else None, groups=[GroupResponse.model_validate(g) for g in req.groups], status=status_response, validation_status=validation_status, validated_by=validated_by, validated_at=validated_at, validation_version=validation_version, author_username=author_username, last_editor_username=last_editor_username, ) @app.get("/api/projects/{project_id}/requirements", response_model=List[RequirementResponse]) async def get_project_requirements( project_id: int, request: Request, group_id: Optional[int] = None, tag_id: Optional[int] = None, db: AsyncSession = Depends(get_db) ): """ Get all requirements for a specific project, optionally filtered by group or tag. User must be a member of the project. Args: project_id: The project ID group_id: Optional group ID to filter by tag_id: Optional tag ID to filter by Returns: List of requirements in the project. """ user = await _get_current_user_db(request, db) await _verify_project_membership(project_id, user.id, db) req_repo = RequirementRepository(db) if group_id: requirements = await req_repo.get_by_group_id(group_id, project_id=project_id) elif tag_id: requirements = await req_repo.get_by_tag_id(tag_id, project_id=project_id) else: requirements = await req_repo.get_by_project_id(project_id) return [_build_requirement_response(req) for req in requirements] @app.get("/api/requirements", response_model=List[RequirementResponse]) async def get_requirements( request: Request, project_id: Optional[int] = None, group_id: Optional[int] = None, tag_id: Optional[int] = None, db: AsyncSession = Depends(get_db) ): """ Get requirements. If project_id is provided, returns requirements for that project. User must be a member of the project. Args: project_id: Required project ID to filter by group_id: Optional group ID to filter by tag_id: Optional tag ID to filter by Returns: List of requirements. """ user = await _get_current_user_db(request, db) if not project_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="project_id is required" ) await _verify_project_membership(project_id, user.id, db) req_repo = RequirementRepository(db) if group_id: requirements = await req_repo.get_by_group_id(group_id, project_id=project_id) elif tag_id: requirements = await req_repo.get_by_tag_id(tag_id, project_id=project_id) else: requirements = await req_repo.get_by_project_id(project_id) return [_build_requirement_response(req) for req in requirements] @app.get("/api/requirements/{requirement_id}", response_model=RequirementResponse) async def get_requirement( requirement_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get a specific requirement by ID. User must be a member of the requirement's project. Args: requirement_id: The requirement ID Returns: The requirement if found and user has access. """ user = await _get_current_user_db(request, db) req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(requirement_id) if not requirement: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) # Verify user is a member of the requirement's project await _verify_project_membership(requirement.project_id, user.id, db) return _build_requirement_response(requirement) @app.post("/api/requirements", response_model=RequirementResponse, status_code=status.HTTP_201_CREATED) async def create_requirement( request: Request, req_data: RequirementCreateRequest, db: AsyncSession = Depends(get_db) ): """ Create a new requirement. User must be a member of the project. Auditors (role_id=2) cannot create requirements. Args: req_data: The requirement data (must include project_id) Returns: The created requirement. """ user = await _get_current_user_db(request, db) # Auditors (role_id=2) cannot create requirements _require_role(user, [1, 3], "create requirements") # Verify user is a member of the project await _verify_project_membership(req_data.project_id, user.id, db) req_repo = RequirementRepository(db) requirement = await req_repo.create( project_id=req_data.project_id, user_id=user.id, tag_id=req_data.tag_id, req_name=req_data.req_name, req_desc=req_data.req_desc, priority_id=req_data.priority_id, group_ids=req_data.group_ids, status_id=req_data.status_id, ) await db.commit() return _build_requirement_response(requirement) @app.put("/api/requirements/{requirement_id}", response_model=RequirementResponse) async def update_requirement( requirement_id: int, request: Request, req_data: RequirementUpdateRequest, db: AsyncSession = Depends(get_db) ): """ Update an existing requirement. User must be a member of the requirement's project. Auditors (role_id=2) cannot edit requirements. Args: requirement_id: The requirement ID to update req_data: The updated requirement data Returns: The updated requirement. """ user = await _get_current_user_db(request, db) # Auditors (role_id=2) cannot edit requirements _require_role(user, [1, 3], "edit requirements") req_repo = RequirementRepository(db) # First check if requirement exists existing_req = await req_repo.get_by_id(requirement_id) if not existing_req: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) # Verify user is a member of the requirement's project await _verify_project_membership(existing_req.project_id, user.id, db) requirement = await req_repo.update( requirement_id=requirement_id, editor_id=user.id, req_name=req_data.req_name, req_desc=req_data.req_desc, tag_id=req_data.tag_id, priority_id=req_data.priority_id, group_ids=req_data.group_ids, status_id=req_data.status_id, ) await db.commit() return _build_requirement_response(requirement) @app.delete("/api/requirements/{requirement_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_requirement( requirement_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Delete a requirement. User must be a member of the requirement's project. Auditors (role_id=2) cannot delete requirements. Args: requirement_id: The requirement ID to delete """ user = await _get_current_user_db(request, db) # Auditors (role_id=2) cannot delete requirements _require_role(user, [1, 3], "delete requirements") req_repo = RequirementRepository(db) # First check if requirement exists existing_req = await req_repo.get_by_id(requirement_id) if not existing_req: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) # Verify user is a member of the requirement's project await _verify_project_membership(existing_req.project_id, user.id, db) await req_repo.delete(requirement_id) await db.commit() @app.get("/api/requirements/{requirement_id}/history", response_model=List[RequirementHistoryResponse]) async def get_requirement_history( requirement_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get the version history for a requirement. Returns all previous versions ordered by version (newest first). Note: Group changes are not tracked in history. Args: requirement_id: The requirement to get history for Returns: List of historical versions with tag, priority, and editor info. """ user = await _get_current_user_db(request, db) # Check if requirement exists req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(requirement_id) if not requirement: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) # Verify user is a member of the requirement's project await _verify_project_membership(requirement.project_id, user.id, db) # Get history history = await req_repo.get_history(requirement_id) return [RequirementHistoryResponse(**h) for h in history] @app.get("/api/projects/{project_id}/deleted-requirements", response_model=List[DeletedRequirementResponse]) async def get_deleted_requirements( project_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get all deleted requirements for a project. Returns requirements that exist in history but have been deleted. User must be a member of the project. Args: project_id: The project to get deleted requirements for Returns: List of deleted requirements with their last known state. """ user = await _get_current_user_db(request, db) # Verify user is a member of the project await _verify_project_membership(project_id, user.id, db) # Get deleted requirements req_repo = RequirementRepository(db) deleted = await req_repo.get_deleted_requirements(project_id) return [DeletedRequirementResponse(**d) for d in deleted] # =========================================== # Validation Endpoints # =========================================== @app.get("/api/validation-statuses", response_model=List[ValidationStatusResponse]) async def get_validation_statuses(db: AsyncSession = Depends(get_db)): """ Get all validation statuses. Returns: List of validation statuses (Approved, Denied, Partial, Not Validated). """ status_repo = ValidationStatusRepository(db) statuses = await status_repo.get_all() return [ValidationStatusResponse.model_validate(s) for s in statuses] @app.post("/api/requirements/{requirement_id}/validations", response_model=ValidationHistoryResponse, status_code=status.HTTP_201_CREATED) async def create_validation( requirement_id: int, request: Request, validation_data: ValidationCreateRequest, db: AsyncSession = Depends(get_db) ): """ Create a new validation for a requirement. Only auditors (role_id=2) can validate requirements. Args: requirement_id: The requirement to validate validation_data: The validation status and optional comment Returns: The created validation record. """ user = await _get_current_user_db(request, db) # Only auditors (role_id=2) can validate _require_role(user, [2], "validate requirements") # Check if requirement exists and user has access req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(requirement_id) if not requirement: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) # Verify user is a member of the requirement's project await _verify_project_membership(requirement.project_id, user.id, db) # Verify status exists status_repo = ValidationStatusRepository(db) validation_status = await status_repo.get_by_id(validation_data.status_id) if not validation_status: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid validation status id {validation_data.status_id}" ) # Create the validation validation_repo = ValidationRepository(db) validation = await validation_repo.create( requirement_id=requirement_id, user_id=user.id, status_id=validation_data.status_id, req_version_snapshot=requirement.version, comment=validation_data.comment ) await db.commit() return ValidationHistoryResponse( id=validation.id, status_name=validation_status.status_name, status_id=validation.status_id, req_version_snapshot=validation.req_version_snapshot, comment=validation.comment, created_at=validation.created_at, validator_username=_get_display_name(user), validator_id=user.id ) @app.get("/api/requirements/{requirement_id}/validations", response_model=List[ValidationHistoryResponse]) async def get_validation_history( requirement_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get the validation history for a requirement. Returns all validations ordered by date (newest first). Args: requirement_id: The requirement to get validation history for Returns: List of validation records with validator info. """ user = await _get_current_user_db(request, db) # Check if requirement exists req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(requirement_id) if not requirement: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) # Verify user is a member of the requirement's project await _verify_project_membership(requirement.project_id, user.id, db) # Get validation history validation_repo = ValidationRepository(db) validations = await validation_repo.get_by_requirement_id(requirement_id) return [ ValidationHistoryResponse( id=v.id, status_name=v.status.status_name, status_id=v.status_id, req_version_snapshot=v.req_version_snapshot, comment=v.comment, created_at=v.created_at, validator_username=_get_display_name(v.user), validator_id=v.user_id ) for v in validations ] # =========================================== # Relationship Types Endpoints # =========================================== @app.get("/api/projects/{project_id}/relationship-types", response_model=List[RelationshipTypeResponse]) async def get_project_relationship_types( project_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get all relationship types for a project. User must be a member of the project. Args: project_id: The project ID Returns: List of relationship types for the project. """ user = await _get_current_user_db(request, db) await _verify_project_membership(project_id, user.id, db) rel_type_repo = RelationshipTypeRepository(db) rel_types = await rel_type_repo.get_by_project_id(project_id) return [RelationshipTypeResponse.model_validate(rt) for rt in rel_types] @app.post("/api/projects/{project_id}/relationship-types", response_model=RelationshipTypeResponse, status_code=status.HTTP_201_CREATED) async def create_relationship_type( project_id: int, request: Request, type_data: RelationshipTypeCreateRequest, db: AsyncSession = Depends(get_db) ): """ Create a new relationship type for a project. Only project admins (role_id=3) can create relationship types. Args: project_id: The project ID type_data: The relationship type data Returns: The created relationship type. """ user = await _get_current_user_db(request, db) # Only admins (role_id=3) can create relationship types _require_role(user, [3], "create relationship types") await _verify_project_membership(project_id, user.id, db) rel_type_repo = RelationshipTypeRepository(db) rel_type = await rel_type_repo.create( project_id=project_id, type_name=type_data.type_name, type_description=type_data.type_description, inverse_type_name=type_data.inverse_type_name ) await db.commit() return RelationshipTypeResponse.model_validate(rel_type) # =========================================== # Requirement Search Endpoint (for autocomplete) # =========================================== @app.get("/api/projects/{project_id}/requirements/search", response_model=List[RequirementSearchResult]) async def search_requirements( project_id: int, request: Request, q: str = "", exclude_id: Optional[int] = None, db: AsyncSession = Depends(get_db) ): """ Search requirements by name or tag code for autocomplete. User must be a member of the project. Args: project_id: The project ID q: Search query (searches tag_code and req_name) exclude_id: Optional requirement ID to exclude from results (to prevent self-linking) Returns: List of matching requirements (limited to 20). """ user = await _get_current_user_db(request, db) await _verify_project_membership(project_id, user.id, db) req_repo = RequirementRepository(db) requirements = await req_repo.search_by_name_or_tag( project_id=project_id, query=q, exclude_id=exclude_id, limit=20 ) return [ RequirementSearchResult( id=req.id, req_name=req.req_name, tag_code=req.tag.tag_code ) for req in requirements ] # =========================================== # Requirement Links Endpoints # =========================================== @app.get("/api/requirements/{requirement_id}/links", response_model=List[RequirementLinkResponse]) async def get_requirement_links( requirement_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get all links for a requirement (both outgoing and incoming). User must be a member of the requirement's project. Args: requirement_id: The requirement ID Returns: List of links with direction info. """ user = await _get_current_user_db(request, db) # Check if requirement exists req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(requirement_id) if not requirement: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) await _verify_project_membership(requirement.project_id, user.id, db) link_repo = RequirementLinkRepository(db) links = await link_repo.get_by_requirement_id(requirement_id) return [RequirementLinkResponse(**link) for link in links] @app.post("/api/requirements/{requirement_id}/links", response_model=RequirementLinkResponse, status_code=status.HTTP_201_CREATED) async def create_requirement_link( requirement_id: int, request: Request, link_data: RequirementLinkCreateRequest, db: AsyncSession = Depends(get_db) ): """ Create a new link from this requirement to another. User must be a member of the project. Auditors (role_id=2) cannot create links. Args: requirement_id: The source requirement ID link_data: The target requirement and relationship type Returns: The created link. """ user = await _get_current_user_db(request, db) # Auditors cannot create links _require_role(user, [1, 3], "create requirement links") req_repo = RequirementRepository(db) # Check if source requirement exists source_req = await req_repo.get_by_id(requirement_id) if not source_req: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) await _verify_project_membership(source_req.project_id, user.id, db) # Check if target requirement exists target_req = await req_repo.get_by_id(link_data.target_requirement_id) if not target_req: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Target requirement with id {link_data.target_requirement_id} not found" ) # Prevent self-linking if requirement_id == link_data.target_requirement_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot link a requirement to itself" ) # Verify both requirements are in the same project if source_req.project_id != target_req.project_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot link requirements from different projects" ) # Verify relationship type exists and belongs to the project rel_type_repo = RelationshipTypeRepository(db) rel_type = await rel_type_repo.get_by_id(link_data.relationship_type_id) if not rel_type: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Relationship type with id {link_data.relationship_type_id} not found" ) if rel_type.project_id != source_req.project_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Relationship type does not belong to this project" ) # Check if link already exists link_repo = RequirementLinkRepository(db) if await link_repo.link_exists(requirement_id, link_data.target_requirement_id, link_data.relationship_type_id): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="This link already exists" ) # Create the link link = await link_repo.create( source_req_id=requirement_id, target_req_id=link_data.target_requirement_id, relationship_type_id=link_data.relationship_type_id, created_by=user.id ) await db.commit() return RequirementLinkResponse( id=link.id, direction="outgoing", type_name=rel_type.type_name, type_id=link.relationship_type_id, inverse_type_name=rel_type.inverse_type_name, linked_requirement={ "id": target_req.id, "req_name": target_req.req_name, "tag_code": target_req.tag.tag_code }, created_by_username=_get_display_name(user), created_by_id=user.id, created_at=link.created_at ) @app.delete("/api/requirement-links/{link_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_requirement_link( link_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Delete a requirement link. Only the creator or an admin can delete a link. Args: link_id: The link ID to delete """ user = await _get_current_user_db(request, db) link_repo = RequirementLinkRepository(db) link = await link_repo.get_by_id(link_id) if not link: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Link with id {link_id} not found" ) # Verify user is a member of the project await _verify_project_membership(link.source_requirement.project_id, user.id, db) # Only creator or admin can delete if link.created_by != user.id and user.role_id != 1: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only the link creator or an admin can delete this link" ) await link_repo.delete(link_id) await db.commit() @app.get("/api/requirements/{requirement_id}/links/history", response_model=List[RequirementLinkHistoryResponse]) async def get_requirement_link_history( requirement_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get the link history for a requirement (deleted/changed links). Returns all historical links where this requirement was source or target. Args: requirement_id: The requirement to get link history for Returns: List of historical links with relationship type snapshots and requirement info. """ user = await _get_current_user_db(request, db) # Check if requirement exists req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(requirement_id) if not requirement: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) # Verify user is a member of the requirement's project await _verify_project_membership(requirement.project_id, user.id, db) # Get link history link_repo = RequirementLinkRepository(db) history = await link_repo.get_history_by_requirement_id(requirement_id) return [RequirementLinkHistoryResponse(**h) for h in history] @app.get("/api/requirements/{requirement_id}/groups/history", response_model=List[RequirementGroupHistoryResponse]) async def get_requirement_group_history( requirement_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get the group association history for a requirement (removed groups). Returns all historical group associations for this requirement. Args: requirement_id: The requirement to get group history for Returns: List of historical group associations with group name/color snapshots. """ user = await _get_current_user_db(request, db) # Check if requirement exists req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(requirement_id) if not requirement: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) # Verify user is a member of the requirement's project await _verify_project_membership(requirement.project_id, user.id, db) # Get group history group_repo = GroupRepository(db) history = await group_repo.get_group_history_by_requirement_id(requirement_id) return [RequirementGroupHistoryResponse(**h) for h in history] @app.get("/api/requirements/{requirement_id}/groups/current", response_model=List[CurrentRequirementGroupResponse]) async def get_requirement_current_groups( requirement_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get the current groups for a requirement with their association timestamps. Returns all current group associations with when they were added. Args: requirement_id: The requirement to get current groups for Returns: List of current group associations with group name/color and created_at timestamp. """ user = await _get_current_user_db(request, db) # Check if requirement exists req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(requirement_id) if not requirement: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) # Verify user is a member of the requirement's project await _verify_project_membership(requirement.project_id, user.id, db) # Get current groups group_repo = GroupRepository(db) groups = await group_repo.get_current_groups_by_requirement_id(requirement_id) return [CurrentRequirementGroupResponse(**g) for g in groups] # =========================================== # Comment Endpoints # =========================================== def _build_comment_response(comment) -> CommentResponse: """Helper function to build CommentResponse from a RequirementComment model.""" return CommentResponse( id=comment.id, comment_text=comment.comment_text, created_at=comment.created_at, updated_at=comment.updated_at, author_id=comment.user_id, author_username=comment.user.username if comment.user else None, author_full_name=comment.user.full_name if comment.user else None, author_role=comment.user.role.role_name if comment.user and comment.user.role else None, replies=[ CommentReplyResponse( id=reply.id, reply_text=reply.reply_text, created_at=reply.created_at, updated_at=reply.updated_at, author_id=reply.user_id, author_username=reply.user.username if reply.user else None, author_full_name=reply.user.full_name if reply.user else None, author_role=reply.user.role.role_name if reply.user and reply.user.role else None, ) for reply in comment.replies ] ) @app.get("/api/requirements/{requirement_id}/comments", response_model=List[CommentResponse]) async def get_requirement_comments( requirement_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get all comments for a requirement with their replies. User must be a member of the requirement's project. Args: requirement_id: The requirement ID Returns: List of comments with nested replies. """ user = await _get_current_user_db(request, db) # Check if requirement exists req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(requirement_id) if not requirement: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) await _verify_project_membership(requirement.project_id, user.id, db) comment_repo = CommentRepository(db) comments = await comment_repo.get_comments_by_requirement_id(requirement_id) return [_build_comment_response(c) for c in comments] @app.post("/api/requirements/{requirement_id}/comments", response_model=CommentResponse, status_code=status.HTTP_201_CREATED) async def create_comment( requirement_id: int, request: Request, comment_data: CommentCreateRequest, db: AsyncSession = Depends(get_db) ): """ Create a new comment on a requirement. Any project member can create comments. Args: requirement_id: The requirement to comment on comment_data: The comment content Returns: The created comment. """ user = await _get_current_user_db(request, db) # Check if requirement exists req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(requirement_id) if not requirement: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Requirement with id {requirement_id} not found" ) await _verify_project_membership(requirement.project_id, user.id, db) # Validate comment text if not comment_data.comment_text.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Comment text cannot be empty" ) comment_repo = CommentRepository(db) comment = await comment_repo.create_comment( requirement_id=requirement_id, user_id=user.id, comment_text=comment_data.comment_text.strip() ) await db.commit() # Fetch the complete comment with user data comment = await comment_repo.get_comment_by_id(comment.id) return _build_comment_response(comment) @app.delete("/api/comments/{comment_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_comment( comment_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Soft delete a comment (hides it from view). Only the comment author or an admin can delete. Args: comment_id: The comment ID to delete """ user = await _get_current_user_db(request, db) comment_repo = CommentRepository(db) comment = await comment_repo.get_comment_by_id(comment_id) if not comment: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Comment with id {comment_id} not found" ) # Verify user is a member of the project req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(comment.requirement_id) await _verify_project_membership(requirement.project_id, user.id, db) # Only author or admin can delete if comment.user_id != user.id and user.role_id != 3: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only the comment author or an admin can delete this comment" ) await comment_repo.soft_delete_comment(comment_id) await db.commit() @app.post("/api/comments/{comment_id}/replies", response_model=CommentReplyResponse, status_code=status.HTTP_201_CREATED) async def create_reply( comment_id: int, request: Request, reply_data: ReplyCreateRequest, db: AsyncSession = Depends(get_db) ): """ Create a reply to a comment. Any project member can create replies. Args: comment_id: The parent comment ID reply_data: The reply content Returns: The created reply. """ user = await _get_current_user_db(request, db) # Check if parent comment exists comment_repo = CommentRepository(db) parent_comment = await comment_repo.get_comment_by_id(comment_id) if not parent_comment: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Comment with id {comment_id} not found" ) if parent_comment.is_deleted: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot reply to a deleted comment" ) # Verify user is a member of the project req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(parent_comment.requirement_id) await _verify_project_membership(requirement.project_id, user.id, db) # Validate reply text if not reply_data.reply_text.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Reply text cannot be empty" ) reply_repo = ReplyRepository(db) reply = await reply_repo.create_reply( parent_comment_id=comment_id, user_id=user.id, reply_text=reply_data.reply_text.strip() ) await db.commit() # Fetch the complete reply with user data reply = await reply_repo.get_reply_by_id(reply.id) return CommentReplyResponse( id=reply.id, reply_text=reply.reply_text, created_at=reply.created_at, updated_at=reply.updated_at, author_id=reply.user_id, author_username=reply.user.username if reply.user else None, author_full_name=reply.user.full_name if reply.user else None, author_role=reply.user.role.role_name if reply.user and reply.user.role else None, ) @app.delete("/api/replies/{reply_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_reply( reply_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Soft delete a reply (hides it from view). Only the reply author or an admin can delete. Args: reply_id: The reply ID to delete """ user = await _get_current_user_db(request, db) reply_repo = ReplyRepository(db) reply = await reply_repo.get_reply_by_id(reply_id) if not reply: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Reply with id {reply_id} not found" ) # Get the parent comment to check project membership comment_repo = CommentRepository(db) parent_comment = await comment_repo.get_comment_by_id(reply.parent_comment_id) # Verify user is a member of the project req_repo = RequirementRepository(db) requirement = await req_repo.get_by_id(parent_comment.requirement_id) await _verify_project_membership(requirement.project_id, user.id, db) # Only author or admin can delete if reply.user_id != user.id and user.role_id != 3: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only the reply author or an admin can delete this reply" ) await reply_repo.soft_delete_reply(reply_id) await db.commit() # =========================================== # Super Admin System Endpoints (role_id=6 only) # =========================================== def _require_super_admin(user): """ Helper to check if user is a super admin (role_id=6). Raises: HTTPException: 403 Forbidden if user is not a super admin """ if user.role_id != 6: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Only super admins can access this endpoint" ) @app.get("/api/admin/system/users", response_model=List[SystemUserResponse]) async def get_all_system_users( request: Request, db: AsyncSession = Depends(get_db) ): """ Get all users in the system (super admin only). Includes Keycloak enabled status. Returns: List of all users with their roles and status. """ current_user = await _get_current_user_db(request, db) _require_super_admin(current_user) user_repo = UserRepository(db) users = await user_repo.get_all() # Build response with Keycloak status result = [] for user in users: # Get enabled status from Keycloak is_enabled = True try: is_enabled = await KeycloakAdminService.get_user_status(user.sub) except Exception as e: logger.warning(f"Could not get Keycloak status for user {user.id}: {e}") result.append(SystemUserResponse( id=user.id, sub=user.sub, username=user.username, full_name=user.full_name, role_id=user.role_id, role_name=user.role.role_name if user.role else "unknown", role_display_name=ROLE_DISPLAY_NAMES.get(user.role.role_name, user.role.role_name.title()) if user.role else "Unknown", is_enabled=is_enabled, created_at=user.created_at )) return result @app.get("/api/admin/system/projects", response_model=List[SystemProjectResponse]) async def get_all_system_projects( request: Request, db: AsyncSession = Depends(get_db) ): """ Get all projects in the system (super admin only). Includes member count for each project. Returns: List of all projects with member counts. """ current_user = await _get_current_user_db(request, db) _require_super_admin(current_user) project_repo = ProjectRepository(db) projects = await project_repo.get_all() result = [] for project in projects: member_count = await project_repo.get_member_count(project.id) result.append(SystemProjectResponse( id=project.id, project_name=project.project_name, project_desc=project.project_desc, member_count=member_count, created_at=project.created_at )) return result @app.post("/api/admin/system/projects", response_model=SystemProjectResponse, status_code=status.HTTP_201_CREATED) async def create_system_project( request: Request, project_data: ProjectCreateRequest, db: AsyncSession = Depends(get_db) ): """ Create a new project (super admin only). Unlike regular project creation, this doesn't add the creator as a member. Args: project_data: The project data Returns: The created project. """ current_user = await _get_current_user_db(request, db) _require_super_admin(current_user) project_repo = ProjectRepository(db) project = await project_repo.create( project_name=project_data.project_name, project_desc=project_data.project_desc, creator_id=None, # Super admin creates without being a member ) await db.commit() return SystemProjectResponse( id=project.id, project_name=project.project_name, project_desc=project.project_desc, member_count=0, created_at=project.created_at ) @app.get("/api/admin/system/projects/{project_id}/members", response_model=List[SystemProjectMemberResponse]) async def get_system_project_members( project_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Get all members of a project (super admin only). Args: project_id: The project ID Returns: List of project members. """ current_user = await _get_current_user_db(request, db) _require_super_admin(current_user) project_repo = ProjectRepository(db) # Check if project exists project = await project_repo.get_by_id(project_id) if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id {project_id} not found" ) members = await project_repo.get_members(project_id) return [ SystemProjectMemberResponse( user_id=member.id, username=member.username, full_name=member.full_name, role_id=member.role_id, role_name=member.role.role_name if member.role else "unknown", joined_at=None # TODO: Add joined_at from project_members table ) for member in members ] @app.post("/api/admin/system/projects/{project_id}/members", status_code=status.HTTP_201_CREATED) async def assign_user_to_project( project_id: int, request: Request, data: AssignUserToProjectRequest, db: AsyncSession = Depends(get_db) ): """ Assign a user to a project (super admin only). Args: project_id: The project ID data: The user to assign Returns: Success message. """ current_user = await _get_current_user_db(request, db) _require_super_admin(current_user) project_repo = ProjectRepository(db) user_repo = UserRepository(db) # Check if project exists project = await project_repo.get_by_id(project_id) if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id {project_id} not found" ) # Check if user exists user = await user_repo.get_by_id(data.user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id {data.user_id} not found" ) # Optionally update role if provided if data.role_id is not None: role_repo = RoleRepository(db) role = await role_repo.get_by_id(data.role_id) if not role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid role id {data.role_id}" ) await user_repo.update_role(data.user_id, data.role_id) # Add user to project added = await project_repo.add_member(project_id, data.user_id) if not added: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="User is already a member of this project" ) await db.commit() return {"message": "User assigned to project successfully"} @app.delete("/api/admin/system/projects/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def remove_user_from_project( project_id: int, user_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Remove a user from a project (super admin only). Args: project_id: The project ID user_id: The user ID to remove """ current_user = await _get_current_user_db(request, db) _require_super_admin(current_user) project_repo = ProjectRepository(db) # Check if project exists project = await project_repo.get_by_id(project_id) if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with id {project_id} not found" ) removed = await project_repo.remove_member(project_id, user_id) if not removed: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User is not a member of this project" ) await db.commit() @app.put("/api/admin/system/users/{user_id}/block", status_code=status.HTTP_200_OK) async def block_user( user_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Block a user by disabling them in Keycloak (super admin only). Blocked users cannot log in. Args: user_id: The user ID to block Returns: Success message. """ current_user = await _get_current_user_db(request, db) _require_super_admin(current_user) # Prevent blocking yourself if current_user.id == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="You cannot block yourself" ) user_repo = UserRepository(db) user = await user_repo.get_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id {user_id} not found" ) # Disable in Keycloak await KeycloakAdminService.disable_user(user.sub) return {"message": f"User {user.username or user.sub} has been blocked"} @app.put("/api/admin/system/users/{user_id}/unblock", status_code=status.HTTP_200_OK) async def unblock_user( user_id: int, request: Request, db: AsyncSession = Depends(get_db) ): """ Unblock a user by enabling them in Keycloak (super admin only). Args: user_id: The user ID to unblock Returns: Success message. """ current_user = await _get_current_user_db(request, db) _require_super_admin(current_user) user_repo = UserRepository(db) user = await user_repo.get_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id {user_id} not found" ) # Enable in Keycloak await KeycloakAdminService.enable_user(user.sub) return {"message": f"User {user.username or user.sub} has been unblocked"} @app.put("/api/admin/system/users/{user_id}/role", response_model=SystemUserResponse) async def update_system_user_role( user_id: int, request: Request, role_data: UserRoleUpdateRequest, db: AsyncSession = Depends(get_db) ): """ Update a user's role (super admin only). Can promote users to super_admin or demote them. Args: user_id: The user ID to update role_data: The new role ID Returns: The updated user info. """ current_user = await _get_current_user_db(request, db) _require_super_admin(current_user) # Prevent self-demotion from super admin if current_user.id == user_id and role_data.role_id != 6: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="You cannot demote yourself. Ask another super admin to change your role." ) # Verify role exists role_repo = RoleRepository(db) role = await role_repo.get_by_id(role_data.role_id) if not role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid role id {role_data.role_id}" ) # Update the user's role user_repo = UserRepository(db) updated_user = await user_repo.update_role(user_id, role_data.role_id) if not updated_user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"User with id {user_id} not found" ) await db.commit() # Get enabled status from Keycloak is_enabled = True try: is_enabled = await KeycloakAdminService.get_user_status(updated_user.sub) except Exception as e: logger.warning(f"Could not get Keycloak status for user {user_id}: {e}") return SystemUserResponse( id=updated_user.id, sub=updated_user.sub, username=updated_user.username, full_name=updated_user.full_name, role_id=updated_user.role_id, role_name=role.role_name, role_display_name=ROLE_DISPLAY_NAMES.get(role.role_name, role.role_name.title()), is_enabled=is_enabled, created_at=updated_user.created_at ) @app.post("/api/admin/system/users", response_model=SystemUserResponse, status_code=status.HTTP_201_CREATED) async def create_system_user( request: Request, user_data: SystemUserCreateRequest, db: AsyncSession = Depends(get_db) ): """ Create a new user in the system (super admin only). Creates the user in Keycloak with a temporary password. Args: user_data: The user data Returns: The created user info. """ current_user = await _get_current_user_db(request, db) _require_super_admin(current_user) # Validate role exists and is not super_admin unless current user is super admin role_repo = RoleRepository(db) role = await role_repo.get_by_id(user_data.role_id) if not role: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid role id {user_data.role_id}" ) # Create user in Keycloak keycloak_sub = await KeycloakAdminService.create_user( username=user_data.username, email=user_data.email, password=user_data.password, first_name=user_data.first_name, last_name=user_data.last_name ) # Build full name full_name = None if user_data.first_name or user_data.last_name: full_name = f"{user_data.first_name or ''} {user_data.last_name or ''}".strip() # Create in local database user_repo = UserRepository(db) new_user = await user_repo.create( sub=keycloak_sub, role_id=user_data.role_id, username=user_data.username, full_name=full_name ) await db.commit() logger.info(f"Super admin {current_user.id} created new user {new_user.id} ({user_data.username})") return SystemUserResponse( id=new_user.id, sub=new_user.sub, username=new_user.username, full_name=new_user.full_name, role_id=new_user.role_id, role_name=role.role_name, role_display_name=ROLE_DISPLAY_NAMES.get(role.role_name, role.role_name.title()), is_enabled=True, created_at=new_user.created_at )