Files
periodic-table/backend/src/main.py

1863 lines
58 KiB
Python

from contextlib import asynccontextmanager
from typing import List, Optional
from fastapi import FastAPI, Depends, Request, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import RedirectResponse
from sqlalchemy.ext.asyncio import AsyncSession
from src.models import (
TokenResponse, UserInfo, GroupResponse,
TagResponse, RequirementResponse, PriorityResponse,
RequirementCreateRequest, RequirementUpdateRequest, RequirementHistoryResponse,
ProjectResponse, ProjectCreateRequest, ProjectUpdateRequest, ProjectMemberRequest,
ValidationStatusResponse, ValidationHistoryResponse, ValidationCreateRequest,
RelationshipTypeResponse, RelationshipTypeCreateRequest, RelationshipTypeUpdateRequest,
RequirementLinkResponse, RequirementLinkCreateRequest, RequirementSearchResult,
RoleResponse, ProjectMemberResponse, UserRoleUpdateRequest, ROLE_DISPLAY_NAMES,
CommentResponse, CommentReplyResponse, CommentCreateRequest, ReplyCreateRequest,
RequirementStatusResponse, DeletedRequirementResponse
)
from src.controller import AuthController
from src.config import get_openid, get_settings
from src.database import init_db, close_db, get_db
from src.repositories import (
RoleRepository, GroupRepository, TagRepository, RequirementRepository,
PriorityRepository, ProjectRepository, ValidationStatusRepository, ValidationRepository,
RelationshipTypeRepository, RequirementLinkRepository, CommentRepository, ReplyRepository,
RequirementStatusRepository
)
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Get settings
settings = get_settings()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Application lifespan manager.
Handles startup and shutdown events.
"""
# Startup
logger.info("Starting up application...")
logger.info("Initializing database...")
await init_db()
logger.info("Database initialized successfully")
# Ensure default roles exist
from src.database import AsyncSessionLocal
async with AsyncSessionLocal() as session:
role_repo = RoleRepository(session)
await role_repo.ensure_default_roles_exist()
await session.commit()
logger.info("Default roles ensured")
# Ensure default requirement statuses exist
async with AsyncSessionLocal() as session:
req_status_repo = RequirementStatusRepository(session)
await req_status_repo.ensure_default_statuses_exist()
await session.commit()
logger.info("Default requirement statuses ensured")
# Ensure default validation statuses exist
async with AsyncSessionLocal() as session:
await session.execute(
__import__('sqlalchemy').text(
"""
INSERT INTO validation_statuses (id, status_name) VALUES
(1, 'Approved'),
(2, 'Denied'),
(3, 'Partial'),
(4, 'Not Validated')
ON CONFLICT (id) DO NOTHING
"""
)
)
await session.commit()
logger.info("Default validation statuses ensured")
yield
# Shutdown
logger.info("Shutting down application...")
await close_db()
logger.info("Database connection closed")
# Initialize the FastAPI app
app = FastAPI(
title="Keycloak Auth API",
version="1.0.0",
lifespan=lifespan
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=[settings.frontend_url],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize the HTTPBearer scheme for authentication
bearer_scheme = HTTPBearer()
# Configure client
keycloak_openid = get_openid()
# Define the root endpoint
@app.get("/api")
async def read_root():
"""
Root endpoint that provides a welcome message and documentation link.
"""
return AuthController.read_root()
# Define the login endpoint
@app.get("/api/login", response_class=RedirectResponse)
async def login(request: Request):
"""
Login endpoint to authenticate the user and return an access token.
Returns:
RedirectResponse: Contains the redirect URL upon successful authentication.
"""
# Build the callback URI using the frontend URL (accessible from browser)
# This ensures the redirect works correctly through nginx proxy
redirect_uri = f"{settings.frontend_url}/api/callback"
# Construct the authorization URL with external Keycloak URL
auth_url = (
f"{settings.keycloak_external_url}realms/{settings.keycloak_realm}"
f"/protocol/openid-connect/auth"
f"?client_id={settings.keycloak_client_id}"
f"&response_type=code"
f"&redirect_uri={redirect_uri}"
f"&scope=openid%20profile%20email"
)
return RedirectResponse(auth_url)
# Define the callback endpoint
@app.get("/api/callback", include_in_schema=False)
async def callback(request: Request, db: AsyncSession = Depends(get_db)):
"""
OAuth callback endpoint that exchanges the authorization code for a token,
provisions the user in the database if needed, and sets it as an HTTP-only cookie.
"""
# Extract the code from the URL
keycode = request.query_params.get('code')
return await AuthController.login(str(keycode), request, db)
# Define the auth/me endpoint to get current user from cookie
@app.get("/api/auth/me", response_model=UserInfo)
async def get_current_user(request: Request, db: AsyncSession = Depends(get_db)):
"""
Get the current authenticated user from the session cookie.
Includes role information from the database.
Returns:
UserInfo: Information about the authenticated user including role.
"""
user_info = AuthController.get_current_user(request)
# Fetch role information from database
from src.repositories import UserRepository
user_repo = UserRepository(db)
db_user = await user_repo.get_by_sub(user_info.sub)
if db_user:
user_info.db_user_id = db_user.id
user_info.role_id = db_user.role_id
user_info.role = db_user.role.role_name if db_user.role else None
return user_info
# Define the logout endpoint
@app.post("/api/auth/logout")
async def logout(request: Request):
"""
Logout endpoint that clears the authentication cookie.
Returns:
dict: Success message.
"""
return AuthController.logout()
# Define the protected endpoint (kept for API token-based access)
@app.get("/api/protected", response_model=UserInfo)
async def protected_endpoint(
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
):
"""
Protected endpoint that requires a valid token for access.
Args:
credentials (HTTPAuthorizationCredentials):
Bearer token provided via HTTP Authorization header.
Returns:
UserInfo: Information about the authenticated user.
"""
return AuthController.protected_endpoint(credentials)
# ===========================================
# Groups Endpoints
# ===========================================
@app.get("/api/groups", response_model=List[GroupResponse])
async def get_groups(db: AsyncSession = Depends(get_db)):
"""
Get all groups.
Returns:
List of all groups with their names and colors.
"""
group_repo = GroupRepository(db)
groups = await group_repo.get_all()
return [GroupResponse.model_validate(g) for g in groups]
# ===========================================
# Tags Endpoints
# ===========================================
@app.get("/api/tags", response_model=List[TagResponse])
async def get_tags(db: AsyncSession = Depends(get_db)):
"""
Get all tags.
Returns:
List of all tags with their codes and descriptions.
"""
tag_repo = TagRepository(db)
tags = await tag_repo.get_all()
return [TagResponse.model_validate(t) for t in tags]
@app.get("/api/tags/{tag_id}", response_model=TagResponse)
async def get_tag(tag_id: int, db: AsyncSession = Depends(get_db)):
"""
Get a specific tag by ID.
Args:
tag_id: The tag ID
Returns:
The tag if found.
"""
tag_repo = TagRepository(db)
tag = await tag_repo.get_by_id(tag_id)
if not tag:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Tag with id {tag_id} not found"
)
return TagResponse.model_validate(tag)
# ===========================================
# Priorities Endpoints
# ===========================================
@app.get("/api/priorities", response_model=List[PriorityResponse])
async def get_priorities(db: AsyncSession = Depends(get_db)):
"""
Get all priorities.
Returns:
List of all priorities ordered by priority_num.
"""
priority_repo = PriorityRepository(db)
priorities = await priority_repo.get_all()
return [PriorityResponse.model_validate(p) for p in priorities]
# ===========================================
# Requirement Statuses Endpoints
# ===========================================
@app.get("/api/requirement-statuses", response_model=List[RequirementStatusResponse])
async def get_requirement_statuses(db: AsyncSession = Depends(get_db)):
"""
Get all requirement lifecycle statuses (Draft, Regular, etc.).
Returns:
List of all requirement statuses.
"""
status_repo = RequirementStatusRepository(db)
statuses = await status_repo.get_all()
return [RequirementStatusResponse.model_validate(s) for s in statuses]
# ===========================================
# Projects Endpoints
# ===========================================
async def _get_current_user_db(request: Request, db: AsyncSession):
"""Helper to get the current authenticated user from the database."""
user_info = AuthController.get_current_user(request)
from src.repositories import UserRepository
user_repo = UserRepository(db)
user = await user_repo.get_by_sub(user_info.sub)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found in database"
)
return user
def _require_role(user, allowed_role_ids: List[int], action: str = "perform this action"):
"""
Helper to check if user has one of the allowed roles.
Args:
user: The database user object
allowed_role_ids: List of role IDs that are permitted (e.g., [1, 3] for admin and user)
action: Description of the action for error message
Raises:
HTTPException: 403 Forbidden if user's role is not in allowed list
"""
if user.role_id not in allowed_role_ids:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Your role does not have permission to {action}"
)
def _get_display_name(user) -> str:
"""
Get the best display name for a user.
Falls back in order: full_name -> username -> sub.
Args:
user: The database user object
Returns:
The best available display name for the user
"""
return user.full_name or user.username or user.sub
async def _verify_project_membership(project_id: int, user_id: int, db: AsyncSession):
"""Helper to verify user is a member of a project."""
project_repo = ProjectRepository(db)
# Check if project exists
project = await project_repo.get_by_id(project_id)
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Project with id {project_id} not found"
)
# Check if user is a member
is_member = await project_repo.is_member(project_id, user_id)
if not is_member:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You are not a member of this project"
)
return project
@app.get("/api/projects", response_model=List[ProjectResponse])
async def get_my_projects(
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get all projects the authenticated user is a member of.
Returns:
List of projects the user belongs to.
"""
user = await _get_current_user_db(request, db)
project_repo = ProjectRepository(db)
projects = await project_repo.get_by_user_id(user.id)
return [ProjectResponse.model_validate(p) for p in projects]
@app.get("/api/projects/{project_id}", response_model=ProjectResponse)
async def get_project(
project_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get a specific project by ID.
User must be a member of the project.
Args:
project_id: The project ID
Returns:
The project if found and user is a member.
"""
user = await _get_current_user_db(request, db)
project = await _verify_project_membership(project_id, user.id, db)
return ProjectResponse.model_validate(project)
@app.post("/api/projects", response_model=ProjectResponse, status_code=status.HTTP_201_CREATED)
async def create_project(
request: Request,
project_data: ProjectCreateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Create a new project.
The creating user will automatically be added as a member.
Args:
project_data: The project data
Returns:
The created project.
"""
user = await _get_current_user_db(request, db)
project_repo = ProjectRepository(db)
project = await project_repo.create(
project_name=project_data.project_name,
project_desc=project_data.project_desc,
creator_id=user.id,
)
await db.commit()
return ProjectResponse.model_validate(project)
@app.put("/api/projects/{project_id}", response_model=ProjectResponse)
async def update_project(
project_id: int,
request: Request,
project_data: ProjectUpdateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Update an existing project.
User must be a member of the project.
Args:
project_id: The project ID to update
project_data: The updated project data
Returns:
The updated project.
"""
user = await _get_current_user_db(request, db)
await _verify_project_membership(project_id, user.id, db)
project_repo = ProjectRepository(db)
project = await project_repo.update(
project_id=project_id,
project_name=project_data.project_name,
project_desc=project_data.project_desc,
)
await db.commit()
return ProjectResponse.model_validate(project)
@app.delete("/api/projects/{project_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_project(
project_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Delete a project.
User must be a member of the project.
Args:
project_id: The project ID to delete
"""
user = await _get_current_user_db(request, db)
await _verify_project_membership(project_id, user.id, db)
project_repo = ProjectRepository(db)
await project_repo.delete(project_id)
await db.commit()
@app.post("/api/projects/{project_id}/members", status_code=status.HTTP_201_CREATED)
async def add_project_member(
project_id: int,
request: Request,
member_data: ProjectMemberRequest,
db: AsyncSession = Depends(get_db)
):
"""
Add a member to a project.
User must be a member of the project.
Args:
project_id: The project ID
member_data: The user to add
Returns:
Success message.
"""
user = await _get_current_user_db(request, db)
await _verify_project_membership(project_id, user.id, db)
project_repo = ProjectRepository(db)
added = await project_repo.add_member(project_id, member_data.user_id)
if not added:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User is already a member of this project"
)
await db.commit()
return {"message": "Member added successfully"}
@app.delete("/api/projects/{project_id}/members/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def remove_project_member(
project_id: int,
user_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Remove a member from a project.
User must be a member of the project.
Args:
project_id: The project ID
user_id: The user ID to remove
"""
current_user = await _get_current_user_db(request, db)
await _verify_project_membership(project_id, current_user.id, db)
project_repo = ProjectRepository(db)
removed = await project_repo.remove_member(project_id, user_id)
if not removed:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User is not a member of this project"
)
await db.commit()
# ===========================================
# Admin Endpoints (Role Management, Project Admin)
# ===========================================
@app.get("/api/roles", response_model=List[RoleResponse])
async def get_all_roles(
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get all available roles with their display names.
Returns:
List of roles with id, role_name, and display_name.
"""
# Ensure user is authenticated
await _get_current_user_db(request, db)
role_repo = RoleRepository(db)
roles = await role_repo.get_all()
return [RoleResponse.from_role(r) for r in roles]
@app.get("/api/projects/{project_id}/members", response_model=List[ProjectMemberResponse])
async def get_project_members(
project_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get all members of a project with their role information.
User must be a member of the project.
Args:
project_id: The project ID
Returns:
List of project members with role info.
"""
user = await _get_current_user_db(request, db)
await _verify_project_membership(project_id, user.id, db)
project_repo = ProjectRepository(db)
members = await project_repo.get_members(project_id)
return [
ProjectMemberResponse(
id=member.id,
sub=_get_display_name(member),
role_id=member.role_id,
role_name=member.role.role_name if member.role else "unknown",
role_display_name=ROLE_DISPLAY_NAMES.get(member.role.role_name, member.role.role_name.title()) if member.role else "Unknown",
created_at=member.created_at
)
for member in members
]
@app.put("/api/projects/{project_id}/members/{user_id}/role", response_model=ProjectMemberResponse)
async def update_member_role(
project_id: int,
user_id: int,
request: Request,
role_data: UserRoleUpdateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Update a project member's role.
Only project admins (role_id=3) can update roles.
Admin cannot demote themselves.
Args:
project_id: The project ID
user_id: The user ID to update
role_data: The new role ID
Returns:
The updated member info.
"""
current_user = await _get_current_user_db(request, db)
# Only admins (role_id=3) can update roles
_require_role(current_user, [3], "update member roles")
await _verify_project_membership(project_id, current_user.id, db)
# Check target user is a member of the project
project_repo = ProjectRepository(db)
if not await project_repo.is_member(project_id, user_id):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User is not a member of this project"
)
# Prevent self-demotion
if current_user.id == user_id and role_data.role_id != 3:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You cannot demote yourself. Ask another admin to change your role."
)
# Verify role exists
role_repo = RoleRepository(db)
role = await role_repo.get_by_id(role_data.role_id)
if not role:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid role id {role_data.role_id}"
)
# Update the user's role
from src.repositories import UserRepository
user_repo = UserRepository(db)
updated_user = await user_repo.update_role(user_id, role_data.role_id)
if not updated_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
await db.commit()
return ProjectMemberResponse(
id=updated_user.id,
sub=_get_display_name(updated_user),
role_id=updated_user.role_id,
role_name=role.role_name,
role_display_name=ROLE_DISPLAY_NAMES.get(role.role_name, role.role_name.title()),
created_at=updated_user.created_at
)
@app.put("/api/projects/{project_id}/relationship-types/{type_id}", response_model=RelationshipTypeResponse)
async def update_relationship_type(
project_id: int,
type_id: int,
request: Request,
type_data: RelationshipTypeUpdateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Update a relationship type.
Only project admins (role_id=3) can update relationship types.
Args:
project_id: The project ID
type_id: The relationship type ID to update
type_data: The updated relationship type data
Returns:
The updated relationship type.
"""
user = await _get_current_user_db(request, db)
# Only admins (role_id=3) can update relationship types
_require_role(user, [3], "update relationship types")
await _verify_project_membership(project_id, user.id, db)
rel_type_repo = RelationshipTypeRepository(db)
# Check if relationship type exists and belongs to the project
existing_type = await rel_type_repo.get_by_id(type_id)
if not existing_type:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Relationship type with id {type_id} not found"
)
if existing_type.project_id != project_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Relationship type does not belong to this project"
)
updated_type = await rel_type_repo.update(
relationship_type_id=type_id,
type_name=type_data.type_name,
type_description=type_data.type_description,
inverse_type_name=type_data.inverse_type_name
)
await db.commit()
return RelationshipTypeResponse.model_validate(updated_type)
@app.delete("/api/projects/{project_id}/relationship-types/{type_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_relationship_type(
project_id: int,
type_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Delete a relationship type.
Only project admins (role_id=3) can delete relationship types.
This will also delete all links using this relationship type.
Args:
project_id: The project ID
type_id: The relationship type ID to delete
"""
user = await _get_current_user_db(request, db)
# Only admins (role_id=3) can delete relationship types
_require_role(user, [3], "delete relationship types")
await _verify_project_membership(project_id, user.id, db)
rel_type_repo = RelationshipTypeRepository(db)
# Check if relationship type exists and belongs to the project
existing_type = await rel_type_repo.get_by_id(type_id)
if not existing_type:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Relationship type with id {type_id} not found"
)
if existing_type.project_id != project_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Relationship type does not belong to this project"
)
deleted = await rel_type_repo.delete(type_id)
if not deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Relationship type with id {type_id} not found"
)
await db.commit()
# ===========================================
# Requirements Endpoints
# ===========================================
def _build_requirement_response(req) -> RequirementResponse:
"""Helper function to build RequirementResponse from a Requirement model."""
# Determine validation status from latest validation
validation_status = "Not Validated"
validated_by = None
validated_at = None
validation_version = None
if req.validations:
# Get the latest validation
latest_validation = max(req.validations, key=lambda v: v.created_at or req.created_at)
validation_status = latest_validation.status.status_name if latest_validation.status else "Not Validated"
# Try to get display name from user relationship
if latest_validation.user:
validated_by = _get_display_name(latest_validation.user)
validated_at = latest_validation.created_at
validation_version = latest_validation.req_version_snapshot
# Get author (creator) display name
author_username = None
if req.user:
author_username = _get_display_name(req.user)
# Get last editor display name
last_editor_username = None
if req.last_editor:
last_editor_username = _get_display_name(req.last_editor)
# Get requirement lifecycle status
status_response = None
if req.status:
status_response = RequirementStatusResponse.model_validate(req.status)
return RequirementResponse(
id=req.id,
project_id=req.project_id,
req_name=req.req_name,
req_desc=req.req_desc,
version=req.version,
created_at=req.created_at,
updated_at=req.updated_at,
tag=TagResponse.model_validate(req.tag),
priority=req.priority if req.priority else None,
groups=[GroupResponse.model_validate(g) for g in req.groups],
status=status_response,
validation_status=validation_status,
validated_by=validated_by,
validated_at=validated_at,
validation_version=validation_version,
author_username=author_username,
last_editor_username=last_editor_username,
)
@app.get("/api/projects/{project_id}/requirements", response_model=List[RequirementResponse])
async def get_project_requirements(
project_id: int,
request: Request,
group_id: Optional[int] = None,
tag_id: Optional[int] = None,
db: AsyncSession = Depends(get_db)
):
"""
Get all requirements for a specific project, optionally filtered by group or tag.
User must be a member of the project.
Args:
project_id: The project ID
group_id: Optional group ID to filter by
tag_id: Optional tag ID to filter by
Returns:
List of requirements in the project.
"""
user = await _get_current_user_db(request, db)
await _verify_project_membership(project_id, user.id, db)
req_repo = RequirementRepository(db)
if group_id:
requirements = await req_repo.get_by_group_id(group_id, project_id=project_id)
elif tag_id:
requirements = await req_repo.get_by_tag_id(tag_id, project_id=project_id)
else:
requirements = await req_repo.get_by_project_id(project_id)
return [_build_requirement_response(req) for req in requirements]
@app.get("/api/requirements", response_model=List[RequirementResponse])
async def get_requirements(
request: Request,
project_id: Optional[int] = None,
group_id: Optional[int] = None,
tag_id: Optional[int] = None,
db: AsyncSession = Depends(get_db)
):
"""
Get requirements. If project_id is provided, returns requirements for that project.
User must be a member of the project.
Args:
project_id: Required project ID to filter by
group_id: Optional group ID to filter by
tag_id: Optional tag ID to filter by
Returns:
List of requirements.
"""
user = await _get_current_user_db(request, db)
if not project_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="project_id is required"
)
await _verify_project_membership(project_id, user.id, db)
req_repo = RequirementRepository(db)
if group_id:
requirements = await req_repo.get_by_group_id(group_id, project_id=project_id)
elif tag_id:
requirements = await req_repo.get_by_tag_id(tag_id, project_id=project_id)
else:
requirements = await req_repo.get_by_project_id(project_id)
return [_build_requirement_response(req) for req in requirements]
@app.get("/api/requirements/{requirement_id}", response_model=RequirementResponse)
async def get_requirement(
requirement_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get a specific requirement by ID.
User must be a member of the requirement's project.
Args:
requirement_id: The requirement ID
Returns:
The requirement if found and user has access.
"""
user = await _get_current_user_db(request, db)
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(requirement_id)
if not requirement:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
# Verify user is a member of the requirement's project
await _verify_project_membership(requirement.project_id, user.id, db)
return _build_requirement_response(requirement)
@app.post("/api/requirements", response_model=RequirementResponse, status_code=status.HTTP_201_CREATED)
async def create_requirement(
request: Request,
req_data: RequirementCreateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Create a new requirement.
User must be a member of the project.
Auditors (role_id=2) cannot create requirements.
Args:
req_data: The requirement data (must include project_id)
Returns:
The created requirement.
"""
user = await _get_current_user_db(request, db)
# Auditors (role_id=2) cannot create requirements
_require_role(user, [1, 3], "create requirements")
# Verify user is a member of the project
await _verify_project_membership(req_data.project_id, user.id, db)
req_repo = RequirementRepository(db)
requirement = await req_repo.create(
project_id=req_data.project_id,
user_id=user.id,
tag_id=req_data.tag_id,
req_name=req_data.req_name,
req_desc=req_data.req_desc,
priority_id=req_data.priority_id,
group_ids=req_data.group_ids,
status_id=req_data.status_id,
)
await db.commit()
return _build_requirement_response(requirement)
@app.put("/api/requirements/{requirement_id}", response_model=RequirementResponse)
async def update_requirement(
requirement_id: int,
request: Request,
req_data: RequirementUpdateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Update an existing requirement.
User must be a member of the requirement's project.
Auditors (role_id=2) cannot edit requirements.
Args:
requirement_id: The requirement ID to update
req_data: The updated requirement data
Returns:
The updated requirement.
"""
user = await _get_current_user_db(request, db)
# Auditors (role_id=2) cannot edit requirements
_require_role(user, [1, 3], "edit requirements")
req_repo = RequirementRepository(db)
# First check if requirement exists
existing_req = await req_repo.get_by_id(requirement_id)
if not existing_req:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
# Verify user is a member of the requirement's project
await _verify_project_membership(existing_req.project_id, user.id, db)
requirement = await req_repo.update(
requirement_id=requirement_id,
editor_id=user.id,
req_name=req_data.req_name,
req_desc=req_data.req_desc,
tag_id=req_data.tag_id,
priority_id=req_data.priority_id,
group_ids=req_data.group_ids,
status_id=req_data.status_id,
)
await db.commit()
return _build_requirement_response(requirement)
@app.delete("/api/requirements/{requirement_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_requirement(
requirement_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Delete a requirement.
User must be a member of the requirement's project.
Auditors (role_id=2) cannot delete requirements.
Args:
requirement_id: The requirement ID to delete
"""
user = await _get_current_user_db(request, db)
# Auditors (role_id=2) cannot delete requirements
_require_role(user, [1, 3], "delete requirements")
req_repo = RequirementRepository(db)
# First check if requirement exists
existing_req = await req_repo.get_by_id(requirement_id)
if not existing_req:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
# Verify user is a member of the requirement's project
await _verify_project_membership(existing_req.project_id, user.id, db)
await req_repo.delete(requirement_id)
await db.commit()
@app.get("/api/requirements/{requirement_id}/history", response_model=List[RequirementHistoryResponse])
async def get_requirement_history(
requirement_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get the version history for a requirement.
Returns all previous versions ordered by version (newest first).
Note: Group changes are not tracked in history.
Args:
requirement_id: The requirement to get history for
Returns:
List of historical versions with tag, priority, and editor info.
"""
user = await _get_current_user_db(request, db)
# Check if requirement exists
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(requirement_id)
if not requirement:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
# Verify user is a member of the requirement's project
await _verify_project_membership(requirement.project_id, user.id, db)
# Get history
history = await req_repo.get_history(requirement_id)
return [RequirementHistoryResponse(**h) for h in history]
@app.get("/api/projects/{project_id}/deleted-requirements", response_model=List[DeletedRequirementResponse])
async def get_deleted_requirements(
project_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get all deleted requirements for a project.
Returns requirements that exist in history but have been deleted.
User must be a member of the project.
Args:
project_id: The project to get deleted requirements for
Returns:
List of deleted requirements with their last known state.
"""
user = await _get_current_user_db(request, db)
# Verify user is a member of the project
await _verify_project_membership(project_id, user.id, db)
# Get deleted requirements
req_repo = RequirementRepository(db)
deleted = await req_repo.get_deleted_requirements(project_id)
return [DeletedRequirementResponse(**d) for d in deleted]
# ===========================================
# Validation Endpoints
# ===========================================
@app.get("/api/validation-statuses", response_model=List[ValidationStatusResponse])
async def get_validation_statuses(db: AsyncSession = Depends(get_db)):
"""
Get all validation statuses.
Returns:
List of validation statuses (Approved, Denied, Partial, Not Validated).
"""
status_repo = ValidationStatusRepository(db)
statuses = await status_repo.get_all()
return [ValidationStatusResponse.model_validate(s) for s in statuses]
@app.post("/api/requirements/{requirement_id}/validations", response_model=ValidationHistoryResponse, status_code=status.HTTP_201_CREATED)
async def create_validation(
requirement_id: int,
request: Request,
validation_data: ValidationCreateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Create a new validation for a requirement.
Only auditors (role_id=2) can validate requirements.
Args:
requirement_id: The requirement to validate
validation_data: The validation status and optional comment
Returns:
The created validation record.
"""
user = await _get_current_user_db(request, db)
# Only auditors (role_id=2) can validate
_require_role(user, [2], "validate requirements")
# Check if requirement exists and user has access
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(requirement_id)
if not requirement:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
# Verify user is a member of the requirement's project
await _verify_project_membership(requirement.project_id, user.id, db)
# Verify status exists
status_repo = ValidationStatusRepository(db)
validation_status = await status_repo.get_by_id(validation_data.status_id)
if not validation_status:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid validation status id {validation_data.status_id}"
)
# Create the validation
validation_repo = ValidationRepository(db)
validation = await validation_repo.create(
requirement_id=requirement_id,
user_id=user.id,
status_id=validation_data.status_id,
req_version_snapshot=requirement.version,
comment=validation_data.comment
)
await db.commit()
return ValidationHistoryResponse(
id=validation.id,
status_name=validation_status.status_name,
status_id=validation.status_id,
req_version_snapshot=validation.req_version_snapshot,
comment=validation.comment,
created_at=validation.created_at,
validator_username=_get_display_name(user),
validator_id=user.id
)
@app.get("/api/requirements/{requirement_id}/validations", response_model=List[ValidationHistoryResponse])
async def get_validation_history(
requirement_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get the validation history for a requirement.
Returns all validations ordered by date (newest first).
Args:
requirement_id: The requirement to get validation history for
Returns:
List of validation records with validator info.
"""
user = await _get_current_user_db(request, db)
# Check if requirement exists
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(requirement_id)
if not requirement:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
# Verify user is a member of the requirement's project
await _verify_project_membership(requirement.project_id, user.id, db)
# Get validation history
validation_repo = ValidationRepository(db)
validations = await validation_repo.get_by_requirement_id(requirement_id)
return [
ValidationHistoryResponse(
id=v.id,
status_name=v.status.status_name,
status_id=v.status_id,
req_version_snapshot=v.req_version_snapshot,
comment=v.comment,
created_at=v.created_at,
validator_username=_get_display_name(v.user),
validator_id=v.user_id
)
for v in validations
]
# ===========================================
# Relationship Types Endpoints
# ===========================================
@app.get("/api/projects/{project_id}/relationship-types", response_model=List[RelationshipTypeResponse])
async def get_project_relationship_types(
project_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get all relationship types for a project.
User must be a member of the project.
Args:
project_id: The project ID
Returns:
List of relationship types for the project.
"""
user = await _get_current_user_db(request, db)
await _verify_project_membership(project_id, user.id, db)
rel_type_repo = RelationshipTypeRepository(db)
rel_types = await rel_type_repo.get_by_project_id(project_id)
return [RelationshipTypeResponse.model_validate(rt) for rt in rel_types]
@app.post("/api/projects/{project_id}/relationship-types", response_model=RelationshipTypeResponse, status_code=status.HTTP_201_CREATED)
async def create_relationship_type(
project_id: int,
request: Request,
type_data: RelationshipTypeCreateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Create a new relationship type for a project.
Only project admins (role_id=3) can create relationship types.
Args:
project_id: The project ID
type_data: The relationship type data
Returns:
The created relationship type.
"""
user = await _get_current_user_db(request, db)
# Only admins (role_id=3) can create relationship types
_require_role(user, [3], "create relationship types")
await _verify_project_membership(project_id, user.id, db)
rel_type_repo = RelationshipTypeRepository(db)
rel_type = await rel_type_repo.create(
project_id=project_id,
type_name=type_data.type_name,
type_description=type_data.type_description,
inverse_type_name=type_data.inverse_type_name
)
await db.commit()
return RelationshipTypeResponse.model_validate(rel_type)
# ===========================================
# Requirement Search Endpoint (for autocomplete)
# ===========================================
@app.get("/api/projects/{project_id}/requirements/search", response_model=List[RequirementSearchResult])
async def search_requirements(
project_id: int,
request: Request,
q: str = "",
exclude_id: Optional[int] = None,
db: AsyncSession = Depends(get_db)
):
"""
Search requirements by name or tag code for autocomplete.
User must be a member of the project.
Args:
project_id: The project ID
q: Search query (searches tag_code and req_name)
exclude_id: Optional requirement ID to exclude from results (to prevent self-linking)
Returns:
List of matching requirements (limited to 20).
"""
user = await _get_current_user_db(request, db)
await _verify_project_membership(project_id, user.id, db)
req_repo = RequirementRepository(db)
requirements = await req_repo.search_by_name_or_tag(
project_id=project_id,
query=q,
exclude_id=exclude_id,
limit=20
)
return [
RequirementSearchResult(
id=req.id,
req_name=req.req_name,
tag_code=req.tag.tag_code
)
for req in requirements
]
# ===========================================
# Requirement Links Endpoints
# ===========================================
@app.get("/api/requirements/{requirement_id}/links", response_model=List[RequirementLinkResponse])
async def get_requirement_links(
requirement_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get all links for a requirement (both outgoing and incoming).
User must be a member of the requirement's project.
Args:
requirement_id: The requirement ID
Returns:
List of links with direction info.
"""
user = await _get_current_user_db(request, db)
# Check if requirement exists
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(requirement_id)
if not requirement:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
await _verify_project_membership(requirement.project_id, user.id, db)
link_repo = RequirementLinkRepository(db)
links = await link_repo.get_by_requirement_id(requirement_id)
return [RequirementLinkResponse(**link) for link in links]
@app.post("/api/requirements/{requirement_id}/links", response_model=RequirementLinkResponse, status_code=status.HTTP_201_CREATED)
async def create_requirement_link(
requirement_id: int,
request: Request,
link_data: RequirementLinkCreateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Create a new link from this requirement to another.
User must be a member of the project.
Auditors (role_id=2) cannot create links.
Args:
requirement_id: The source requirement ID
link_data: The target requirement and relationship type
Returns:
The created link.
"""
user = await _get_current_user_db(request, db)
# Auditors cannot create links
_require_role(user, [1, 3], "create requirement links")
req_repo = RequirementRepository(db)
# Check if source requirement exists
source_req = await req_repo.get_by_id(requirement_id)
if not source_req:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
await _verify_project_membership(source_req.project_id, user.id, db)
# Check if target requirement exists
target_req = await req_repo.get_by_id(link_data.target_requirement_id)
if not target_req:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Target requirement with id {link_data.target_requirement_id} not found"
)
# Prevent self-linking
if requirement_id == link_data.target_requirement_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot link a requirement to itself"
)
# Verify both requirements are in the same project
if source_req.project_id != target_req.project_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot link requirements from different projects"
)
# Verify relationship type exists and belongs to the project
rel_type_repo = RelationshipTypeRepository(db)
rel_type = await rel_type_repo.get_by_id(link_data.relationship_type_id)
if not rel_type:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Relationship type with id {link_data.relationship_type_id} not found"
)
if rel_type.project_id != source_req.project_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Relationship type does not belong to this project"
)
# Check if link already exists
link_repo = RequirementLinkRepository(db)
if await link_repo.link_exists(requirement_id, link_data.target_requirement_id, link_data.relationship_type_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="This link already exists"
)
# Create the link
link = await link_repo.create(
source_req_id=requirement_id,
target_req_id=link_data.target_requirement_id,
relationship_type_id=link_data.relationship_type_id,
created_by=user.id
)
await db.commit()
return RequirementLinkResponse(
id=link.id,
direction="outgoing",
type_name=rel_type.type_name,
type_id=link.relationship_type_id,
inverse_type_name=rel_type.inverse_type_name,
linked_requirement={
"id": target_req.id,
"req_name": target_req.req_name,
"tag_code": target_req.tag.tag_code
},
created_by_username=_get_display_name(user),
created_by_id=user.id,
created_at=link.created_at
)
@app.delete("/api/requirement-links/{link_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_requirement_link(
link_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Delete a requirement link.
Only the creator or an admin can delete a link.
Args:
link_id: The link ID to delete
"""
user = await _get_current_user_db(request, db)
link_repo = RequirementLinkRepository(db)
link = await link_repo.get_by_id(link_id)
if not link:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Link with id {link_id} not found"
)
# Verify user is a member of the project
await _verify_project_membership(link.source_requirement.project_id, user.id, db)
# Only creator or admin can delete
if link.created_by != user.id and user.role_id != 1:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the link creator or an admin can delete this link"
)
await link_repo.delete(link_id)
await db.commit()
# ===========================================
# Comment Endpoints
# ===========================================
def _build_comment_response(comment) -> CommentResponse:
"""Helper function to build CommentResponse from a RequirementComment model."""
return CommentResponse(
id=comment.id,
comment_text=comment.comment_text,
created_at=comment.created_at,
updated_at=comment.updated_at,
author_id=comment.user_id,
author_username=comment.user.username if comment.user else None,
author_full_name=comment.user.full_name if comment.user else None,
author_role=comment.user.role.role_name if comment.user and comment.user.role else None,
replies=[
CommentReplyResponse(
id=reply.id,
reply_text=reply.reply_text,
created_at=reply.created_at,
updated_at=reply.updated_at,
author_id=reply.user_id,
author_username=reply.user.username if reply.user else None,
author_full_name=reply.user.full_name if reply.user else None,
author_role=reply.user.role.role_name if reply.user and reply.user.role else None,
)
for reply in comment.replies
]
)
@app.get("/api/requirements/{requirement_id}/comments", response_model=List[CommentResponse])
async def get_requirement_comments(
requirement_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Get all comments for a requirement with their replies.
User must be a member of the requirement's project.
Args:
requirement_id: The requirement ID
Returns:
List of comments with nested replies.
"""
user = await _get_current_user_db(request, db)
# Check if requirement exists
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(requirement_id)
if not requirement:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
await _verify_project_membership(requirement.project_id, user.id, db)
comment_repo = CommentRepository(db)
comments = await comment_repo.get_comments_by_requirement_id(requirement_id)
return [_build_comment_response(c) for c in comments]
@app.post("/api/requirements/{requirement_id}/comments", response_model=CommentResponse, status_code=status.HTTP_201_CREATED)
async def create_comment(
requirement_id: int,
request: Request,
comment_data: CommentCreateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Create a new comment on a requirement.
Any project member can create comments.
Args:
requirement_id: The requirement to comment on
comment_data: The comment content
Returns:
The created comment.
"""
user = await _get_current_user_db(request, db)
# Check if requirement exists
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(requirement_id)
if not requirement:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
await _verify_project_membership(requirement.project_id, user.id, db)
# Validate comment text
if not comment_data.comment_text.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Comment text cannot be empty"
)
comment_repo = CommentRepository(db)
comment = await comment_repo.create_comment(
requirement_id=requirement_id,
user_id=user.id,
comment_text=comment_data.comment_text.strip()
)
await db.commit()
# Fetch the complete comment with user data
comment = await comment_repo.get_comment_by_id(comment.id)
return _build_comment_response(comment)
@app.delete("/api/comments/{comment_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_comment(
comment_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Soft delete a comment (hides it from view).
Only the comment author or an admin can delete.
Args:
comment_id: The comment ID to delete
"""
user = await _get_current_user_db(request, db)
comment_repo = CommentRepository(db)
comment = await comment_repo.get_comment_by_id(comment_id)
if not comment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Comment with id {comment_id} not found"
)
# Verify user is a member of the project
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(comment.requirement_id)
await _verify_project_membership(requirement.project_id, user.id, db)
# Only author or admin can delete
if comment.user_id != user.id and user.role_id != 3:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the comment author or an admin can delete this comment"
)
await comment_repo.soft_delete_comment(comment_id)
await db.commit()
@app.post("/api/comments/{comment_id}/replies", response_model=CommentReplyResponse, status_code=status.HTTP_201_CREATED)
async def create_reply(
comment_id: int,
request: Request,
reply_data: ReplyCreateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Create a reply to a comment.
Any project member can create replies.
Args:
comment_id: The parent comment ID
reply_data: The reply content
Returns:
The created reply.
"""
user = await _get_current_user_db(request, db)
# Check if parent comment exists
comment_repo = CommentRepository(db)
parent_comment = await comment_repo.get_comment_by_id(comment_id)
if not parent_comment:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Comment with id {comment_id} not found"
)
if parent_comment.is_deleted:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot reply to a deleted comment"
)
# Verify user is a member of the project
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(parent_comment.requirement_id)
await _verify_project_membership(requirement.project_id, user.id, db)
# Validate reply text
if not reply_data.reply_text.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Reply text cannot be empty"
)
reply_repo = ReplyRepository(db)
reply = await reply_repo.create_reply(
parent_comment_id=comment_id,
user_id=user.id,
reply_text=reply_data.reply_text.strip()
)
await db.commit()
# Fetch the complete reply with user data
reply = await reply_repo.get_reply_by_id(reply.id)
return CommentReplyResponse(
id=reply.id,
reply_text=reply.reply_text,
created_at=reply.created_at,
updated_at=reply.updated_at,
author_id=reply.user_id,
author_username=reply.user.username if reply.user else None,
author_full_name=reply.user.full_name if reply.user else None,
author_role=reply.user.role.role_name if reply.user and reply.user.role else None,
)
@app.delete("/api/replies/{reply_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_reply(
reply_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Soft delete a reply (hides it from view).
Only the reply author or an admin can delete.
Args:
reply_id: The reply ID to delete
"""
user = await _get_current_user_db(request, db)
reply_repo = ReplyRepository(db)
reply = await reply_repo.get_reply_by_id(reply_id)
if not reply:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Reply with id {reply_id} not found"
)
# Get the parent comment to check project membership
comment_repo = CommentRepository(db)
parent_comment = await comment_repo.get_comment_by_id(reply.parent_comment_id)
# Verify user is a member of the project
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(parent_comment.requirement_id)
await _verify_project_membership(requirement.project_id, user.id, db)
# Only author or admin can delete
if reply.user_id != user.id and user.role_id != 3:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only the reply author or an admin can delete this reply"
)
await reply_repo.soft_delete_reply(reply_id)
await db.commit()