Added functions to fetch requirements from db.

Added functionality to create requirements on page
This commit is contained in:
gulimabr
2025-11-30 15:35:36 -03:00
parent bbbe65067b
commit eb70598cab
11 changed files with 1383 additions and 148 deletions

View File

@@ -1,15 +1,19 @@
from contextlib import asynccontextmanager
from typing import List
from fastapi import FastAPI, Depends, Request
from typing import List, Optional
from fastapi import FastAPI, Depends, Request, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import RedirectResponse
from sqlalchemy.ext.asyncio import AsyncSession
from src.models import TokenResponse, UserInfo, GroupResponse
from src.models import (
TokenResponse, UserInfo, GroupResponse,
TagResponse, RequirementResponse, PriorityResponse,
RequirementCreateRequest, RequirementUpdateRequest
)
from src.controller import AuthController
from src.config import get_openid, get_settings
from src.database import init_db, close_db, get_db
from src.repositories import RoleRepository, GroupRepository
from src.repositories import RoleRepository, GroupRepository, TagRepository, RequirementRepository, PriorityRepository
import logging
# Configure logging
@@ -176,3 +180,254 @@ async def get_groups(db: AsyncSession = Depends(get_db)):
group_repo = GroupRepository(db)
groups = await group_repo.get_all()
return [GroupResponse.model_validate(g) for g in groups]
# ===========================================
# Tags Endpoints
# ===========================================
@app.get("/api/tags", response_model=List[TagResponse])
async def get_tags(db: AsyncSession = Depends(get_db)):
"""
Get all tags.
Returns:
List of all tags with their codes and descriptions.
"""
tag_repo = TagRepository(db)
tags = await tag_repo.get_all()
return [TagResponse.model_validate(t) for t in tags]
@app.get("/api/tags/{tag_id}", response_model=TagResponse)
async def get_tag(tag_id: int, db: AsyncSession = Depends(get_db)):
"""
Get a specific tag by ID.
Args:
tag_id: The tag ID
Returns:
The tag if found.
"""
tag_repo = TagRepository(db)
tag = await tag_repo.get_by_id(tag_id)
if not tag:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Tag with id {tag_id} not found"
)
return TagResponse.model_validate(tag)
# ===========================================
# Priorities Endpoints
# ===========================================
@app.get("/api/priorities", response_model=List[PriorityResponse])
async def get_priorities(db: AsyncSession = Depends(get_db)):
"""
Get all priorities.
Returns:
List of all priorities ordered by priority_num.
"""
priority_repo = PriorityRepository(db)
priorities = await priority_repo.get_all()
return [PriorityResponse.model_validate(p) for p in priorities]
# ===========================================
# Requirements Endpoints
# ===========================================
def _build_requirement_response(req) -> RequirementResponse:
"""Helper function to build RequirementResponse from a Requirement model."""
# Determine validation status from latest validation
validation_status = "Not Validated"
if req.validations:
# Get the latest validation
latest_validation = max(req.validations, key=lambda v: v.created_at or req.created_at)
validation_status = latest_validation.status.status_name if latest_validation.status else "Not Validated"
return RequirementResponse(
id=req.id,
req_name=req.req_name,
req_desc=req.req_desc,
version=req.version,
created_at=req.created_at,
updated_at=req.updated_at,
tag=TagResponse.model_validate(req.tag),
priority=req.priority if req.priority else None,
groups=[GroupResponse.model_validate(g) for g in req.groups],
validation_status=validation_status,
)
@app.get("/api/requirements", response_model=List[RequirementResponse])
async def get_requirements(
group_id: Optional[int] = None,
tag_id: Optional[int] = None,
db: AsyncSession = Depends(get_db)
):
"""
Get all requirements, optionally filtered by group or tag.
Args:
group_id: Optional group ID to filter by
tag_id: Optional tag ID to filter by
Returns:
List of all requirements with their related data.
"""
req_repo = RequirementRepository(db)
if group_id:
requirements = await req_repo.get_by_group_id(group_id)
elif tag_id:
requirements = await req_repo.get_by_tag_id(tag_id)
else:
requirements = await req_repo.get_all()
return [_build_requirement_response(req) for req in requirements]
@app.get("/api/requirements/{requirement_id}", response_model=RequirementResponse)
async def get_requirement(requirement_id: int, db: AsyncSession = Depends(get_db)):
"""
Get a specific requirement by ID.
Args:
requirement_id: The requirement ID
Returns:
The requirement if found.
"""
req_repo = RequirementRepository(db)
requirement = await req_repo.get_by_id(requirement_id)
if not requirement:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
return _build_requirement_response(requirement)
@app.post("/api/requirements", response_model=RequirementResponse, status_code=status.HTTP_201_CREATED)
async def create_requirement(
request: Request,
req_data: RequirementCreateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Create a new requirement.
Args:
req_data: The requirement data
Returns:
The created requirement.
"""
# Get the current user from cookie
user_info = AuthController.get_current_user(request)
# Get the user's database ID
from src.repositories import UserRepository
user_repo = UserRepository(db)
user = await user_repo.get_by_sub(user_info.sub)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found in database"
)
req_repo = RequirementRepository(db)
requirement = await req_repo.create(
user_id=user.id,
tag_id=req_data.tag_id,
req_name=req_data.req_name,
req_desc=req_data.req_desc,
priority_id=req_data.priority_id,
group_ids=req_data.group_ids,
)
await db.commit()
return _build_requirement_response(requirement)
@app.put("/api/requirements/{requirement_id}", response_model=RequirementResponse)
async def update_requirement(
requirement_id: int,
request: Request,
req_data: RequirementUpdateRequest,
db: AsyncSession = Depends(get_db)
):
"""
Update an existing requirement.
Args:
requirement_id: The requirement ID to update
req_data: The updated requirement data
Returns:
The updated requirement.
"""
# Get the current user from cookie
user_info = AuthController.get_current_user(request)
# Get the user's database ID
from src.repositories import UserRepository
user_repo = UserRepository(db)
user = await user_repo.get_by_sub(user_info.sub)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found in database"
)
req_repo = RequirementRepository(db)
requirement = await req_repo.update(
requirement_id=requirement_id,
editor_id=user.id,
req_name=req_data.req_name,
req_desc=req_data.req_desc,
tag_id=req_data.tag_id,
priority_id=req_data.priority_id,
group_ids=req_data.group_ids,
)
if not requirement:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
await db.commit()
return _build_requirement_response(requirement)
@app.delete("/api/requirements/{requirement_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_requirement(
requirement_id: int,
request: Request,
db: AsyncSession = Depends(get_db)
):
"""
Delete a requirement.
Args:
requirement_id: The requirement ID to delete
"""
# Verify user is authenticated
AuthController.get_current_user(request)
req_repo = RequirementRepository(db)
deleted = await req_repo.delete(requirement_id)
if not deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Requirement with id {requirement_id} not found"
)
await db.commit()

View File

@@ -1,4 +1,5 @@
from typing import Optional, List
from datetime import datetime
from pydantic import BaseModel, SecretStr
@@ -35,3 +36,84 @@ class GroupResponse(BaseModel):
class GroupListResponse(BaseModel):
"""Response schema for list of groups."""
groups: List[GroupResponse]
# Tag schemas
class TagResponse(BaseModel):
"""Response schema for a single tag."""
id: int
tag_code: str
tag_description: str
class Config:
from_attributes = True
class TagListResponse(BaseModel):
"""Response schema for list of tags."""
tags: List[TagResponse]
# Priority schemas
class PriorityResponse(BaseModel):
"""Response schema for a priority."""
id: int
priority_name: str
priority_num: int
class Config:
from_attributes = True
# Validation schemas
class ValidationResponse(BaseModel):
"""Response schema for a validation."""
id: int
status_name: str
req_version_snapshot: int
comment: Optional[str] = None
created_at: Optional[datetime] = None
class Config:
from_attributes = True
# Requirement schemas
class RequirementResponse(BaseModel):
"""Response schema for a single requirement."""
id: int
req_name: str
req_desc: Optional[str] = None
version: int
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
tag: TagResponse
priority: Optional[PriorityResponse] = None
groups: List[GroupResponse] = []
validation_status: Optional[str] = None # Computed from latest validation
class Config:
from_attributes = True
class RequirementListResponse(BaseModel):
"""Response schema for list of requirements."""
requirements: List[RequirementResponse]
class RequirementCreateRequest(BaseModel):
"""Request schema for creating a requirement."""
tag_id: int
req_name: str
req_desc: Optional[str] = None
priority_id: Optional[int] = None
group_ids: Optional[List[int]] = None
class RequirementUpdateRequest(BaseModel):
"""Request schema for updating a requirement."""
req_name: Optional[str] = None
req_desc: Optional[str] = None
tag_id: Optional[int] = None
priority_id: Optional[int] = None
group_ids: Optional[List[int]] = None

View File

@@ -3,9 +3,15 @@ Repository layer for database operations.
"""
from src.repositories.user_repository import UserRepository, RoleRepository
from src.repositories.group_repository import GroupRepository
from src.repositories.tag_repository import TagRepository
from src.repositories.requirement_repository import RequirementRepository
from src.repositories.priority_repository import PriorityRepository
__all__ = [
"UserRepository",
"RoleRepository",
"GroupRepository",
"TagRepository",
"RequirementRepository",
"PriorityRepository",
]

View File

@@ -0,0 +1,76 @@
"""
Repository layer for Priority database operations.
"""
from typing import List, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from src.db_models import Priority
import logging
logger = logging.getLogger(__name__)
class PriorityRepository:
"""Repository for Priority-related database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def get_all(self) -> List[Priority]:
"""
Get all priorities ordered by priority_num.
Returns:
List of all priorities
"""
result = await self.session.execute(
select(Priority).order_by(Priority.priority_num)
)
return list(result.scalars().all())
async def get_by_id(self, priority_id: int) -> Optional[Priority]:
"""
Get a priority by ID.
Args:
priority_id: The priority ID
Returns:
Priority if found, None otherwise
"""
result = await self.session.execute(
select(Priority).where(Priority.id == priority_id)
)
return result.scalar_one_or_none()
async def get_by_name(self, priority_name: str) -> Optional[Priority]:
"""
Get a priority by name.
Args:
priority_name: The priority name
Returns:
Priority if found, None otherwise
"""
result = await self.session.execute(
select(Priority).where(Priority.priority_name == priority_name)
)
return result.scalar_one_or_none()
async def create(self, priority_name: str, priority_num: int) -> Priority:
"""
Create a new priority.
Args:
priority_name: The priority name
priority_num: The priority number for ordering
Returns:
The created Priority
"""
priority = Priority(priority_name=priority_name, priority_num=priority_num)
self.session.add(priority)
await self.session.flush()
await self.session.refresh(priority)
return priority

View File

@@ -0,0 +1,227 @@
"""
Repository layer for Requirement database operations.
"""
from typing import List, Optional
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from src.db_models import Requirement, Group, Tag, Priority, Validation
import logging
logger = logging.getLogger(__name__)
class RequirementRepository:
"""Repository for Requirement-related database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def get_all(self) -> List[Requirement]:
"""
Get all requirements with their related data (tag, priority, groups, validations).
Returns:
List of all requirements with eager-loaded relationships
"""
result = await self.session.execute(
select(Requirement)
.options(
selectinload(Requirement.tag),
selectinload(Requirement.priority),
selectinload(Requirement.groups),
selectinload(Requirement.validations).selectinload(Validation.status),
)
.order_by(Requirement.created_at.desc())
)
return list(result.scalars().all())
async def get_by_id(self, requirement_id: int) -> Optional[Requirement]:
"""
Get a requirement by ID with its related data.
Args:
requirement_id: The requirement ID
Returns:
Requirement if found, None otherwise
"""
result = await self.session.execute(
select(Requirement)
.options(
selectinload(Requirement.tag),
selectinload(Requirement.priority),
selectinload(Requirement.groups),
selectinload(Requirement.validations).selectinload(Validation.status),
selectinload(Requirement.user),
selectinload(Requirement.last_editor),
)
.where(Requirement.id == requirement_id)
)
return result.scalar_one_or_none()
async def get_by_group_id(self, group_id: int) -> List[Requirement]:
"""
Get all requirements belonging to a specific group.
Args:
group_id: The group ID
Returns:
List of requirements in the group
"""
result = await self.session.execute(
select(Requirement)
.options(
selectinload(Requirement.tag),
selectinload(Requirement.priority),
selectinload(Requirement.groups),
selectinload(Requirement.validations).selectinload(Validation.status),
)
.join(Requirement.groups)
.where(Group.id == group_id)
.order_by(Requirement.created_at.desc())
)
return list(result.scalars().all())
async def get_by_tag_id(self, tag_id: int) -> List[Requirement]:
"""
Get all requirements with a specific tag.
Args:
tag_id: The tag ID
Returns:
List of requirements with the tag
"""
result = await self.session.execute(
select(Requirement)
.options(
selectinload(Requirement.tag),
selectinload(Requirement.priority),
selectinload(Requirement.groups),
selectinload(Requirement.validations).selectinload(Validation.status),
)
.where(Requirement.tag_id == tag_id)
.order_by(Requirement.created_at.desc())
)
return list(result.scalars().all())
async def create(
self,
user_id: int,
tag_id: int,
req_name: str,
req_desc: Optional[str] = None,
priority_id: Optional[int] = None,
group_ids: Optional[List[int]] = None,
) -> Requirement:
"""
Create a new requirement.
Args:
user_id: The creating user's ID
tag_id: The tag ID
req_name: The requirement name
req_desc: The requirement description (optional)
priority_id: The priority ID (optional)
group_ids: List of group IDs to associate (optional)
Returns:
The created Requirement
"""
requirement = Requirement(
user_id=user_id,
tag_id=tag_id,
req_name=req_name,
req_desc=req_desc,
priority_id=priority_id,
)
# Add groups if provided
if group_ids:
groups_result = await self.session.execute(
select(Group).where(Group.id.in_(group_ids))
)
groups = list(groups_result.scalars().all())
requirement.groups = groups
self.session.add(requirement)
await self.session.flush()
await self.session.refresh(requirement)
# Reload with relationships
return await self.get_by_id(requirement.id)
async def update(
self,
requirement_id: int,
editor_id: int,
req_name: Optional[str] = None,
req_desc: Optional[str] = None,
tag_id: Optional[int] = None,
priority_id: Optional[int] = None,
group_ids: Optional[List[int]] = None,
) -> Optional[Requirement]:
"""
Update an existing requirement.
Args:
requirement_id: The requirement ID to update
editor_id: The ID of the user making the edit
req_name: New requirement name (optional)
req_desc: New requirement description (optional)
tag_id: New tag ID (optional)
priority_id: New priority ID (optional)
group_ids: New list of group IDs (optional)
Returns:
The updated Requirement, or None if not found
"""
requirement = await self.get_by_id(requirement_id)
if not requirement:
return None
# Update fields if provided
if req_name is not None:
requirement.req_name = req_name
if req_desc is not None:
requirement.req_desc = req_desc
if tag_id is not None:
requirement.tag_id = tag_id
if priority_id is not None:
requirement.priority_id = priority_id
# Set the last editor
requirement.last_editor_id = editor_id
# Update groups if provided
if group_ids is not None:
groups_result = await self.session.execute(
select(Group).where(Group.id.in_(group_ids))
)
groups = list(groups_result.scalars().all())
requirement.groups = groups
await self.session.flush()
await self.session.refresh(requirement)
return await self.get_by_id(requirement_id)
async def delete(self, requirement_id: int) -> bool:
"""
Delete a requirement.
Args:
requirement_id: The requirement ID to delete
Returns:
True if deleted, False if not found
"""
requirement = await self.get_by_id(requirement_id)
if not requirement:
return False
await self.session.delete(requirement)
await self.session.flush()
return True

View File

@@ -0,0 +1,76 @@
"""
Repository layer for Tag database operations.
"""
from typing import List, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from src.db_models import Tag
import logging
logger = logging.getLogger(__name__)
class TagRepository:
"""Repository for Tag-related database operations."""
def __init__(self, session: AsyncSession):
self.session = session
async def get_all(self) -> List[Tag]:
"""
Get all tags.
Returns:
List of all tags
"""
result = await self.session.execute(
select(Tag).order_by(Tag.tag_code)
)
return list(result.scalars().all())
async def get_by_id(self, tag_id: int) -> Optional[Tag]:
"""
Get a tag by ID.
Args:
tag_id: The tag ID
Returns:
Tag if found, None otherwise
"""
result = await self.session.execute(
select(Tag).where(Tag.id == tag_id)
)
return result.scalar_one_or_none()
async def get_by_code(self, tag_code: str) -> Optional[Tag]:
"""
Get a tag by code.
Args:
tag_code: The tag code (e.g., GSR, SFR)
Returns:
Tag if found, None otherwise
"""
result = await self.session.execute(
select(Tag).where(Tag.tag_code == tag_code)
)
return result.scalar_one_or_none()
async def create(self, tag_code: str, tag_description: str) -> Tag:
"""
Create a new tag.
Args:
tag_code: The tag code (e.g., GSR, SFR)
tag_description: The tag description
Returns:
The created Tag
"""
tag = Tag(tag_code=tag_code, tag_description=tag_description)
self.session.add(tag)
await self.session.flush()
await self.session.refresh(tag)
return tag