Added admin page
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
Repository layer for User database operations.
|
||||
Handles CRUD operations and user provisioning on first login.
|
||||
"""
|
||||
from typing import Optional
|
||||
from typing import Optional, List
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
@@ -11,8 +11,64 @@ import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default role for new users
|
||||
DEFAULT_ROLE_NAME = "user"
|
||||
# Default role for new users (editor = role_id 1)
|
||||
DEFAULT_ROLE_NAME = "editor"
|
||||
|
||||
|
||||
class RoleRepository:
|
||||
"""Repository for Role-related database operations."""
|
||||
|
||||
def __init__(self, session: AsyncSession):
|
||||
self.session = session
|
||||
|
||||
async def get_by_name(self, role_name: str) -> Optional[Role]:
|
||||
"""Get a role by name."""
|
||||
result = await self.session.execute(
|
||||
select(Role).where(Role.role_name == role_name)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def get_by_id(self, role_id: int) -> Optional[Role]:
|
||||
"""Get a role by ID."""
|
||||
result = await self.session.execute(
|
||||
select(Role).where(Role.id == role_id)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def create(self, role_name: str) -> Role:
|
||||
"""Create a new role."""
|
||||
role = Role(role_name=role_name)
|
||||
self.session.add(role)
|
||||
await self.session.flush()
|
||||
await self.session.refresh(role)
|
||||
return role
|
||||
|
||||
async def get_all(self) -> List[Role]:
|
||||
"""
|
||||
Get all roles.
|
||||
|
||||
Returns:
|
||||
List of all roles
|
||||
"""
|
||||
result = await self.session.execute(
|
||||
select(Role).order_by(Role.id)
|
||||
)
|
||||
return list(result.scalars().all())
|
||||
|
||||
async def ensure_default_roles_exist(self) -> None:
|
||||
"""
|
||||
Ensure default roles exist in the database.
|
||||
Called during application startup.
|
||||
Creates roles in order: editor (1), auditor (2), admin (3)
|
||||
"""
|
||||
# Order matters for role IDs: editor=1, auditor=2, admin=3
|
||||
default_roles = ["editor", "auditor", "admin"]
|
||||
|
||||
for role_name in default_roles:
|
||||
existing = await self.get_by_name(role_name)
|
||||
if existing is None:
|
||||
logger.info(f"Creating default role: {role_name}")
|
||||
await self.create(role_name)
|
||||
|
||||
|
||||
class UserRepository:
|
||||
@@ -72,6 +128,26 @@ class UserRepository:
|
||||
await self.session.refresh(user)
|
||||
return user
|
||||
|
||||
async def update_role(self, user_id: int, role_id: int) -> Optional[User]:
|
||||
"""
|
||||
Update a user's role.
|
||||
|
||||
Args:
|
||||
user_id: The user ID to update
|
||||
role_id: The new role ID
|
||||
|
||||
Returns:
|
||||
The updated User, or None if not found
|
||||
"""
|
||||
user = await self.get_by_id(user_id)
|
||||
if not user:
|
||||
return None
|
||||
|
||||
user.role_id = role_id
|
||||
await self.session.flush()
|
||||
await self.session.refresh(user)
|
||||
return user
|
||||
|
||||
async def get_or_create_default_role(self) -> Role:
|
||||
"""
|
||||
Get the default user role, creating it if it doesn't exist.
|
||||
@@ -79,17 +155,12 @@ class UserRepository:
|
||||
Returns:
|
||||
The default Role
|
||||
"""
|
||||
result = await self.session.execute(
|
||||
select(Role).where(Role.role_name == DEFAULT_ROLE_NAME)
|
||||
)
|
||||
role = result.scalar_one_or_none()
|
||||
role_repo = RoleRepository(self.session)
|
||||
role = await role_repo.get_by_name(DEFAULT_ROLE_NAME)
|
||||
|
||||
if role is None:
|
||||
logger.info(f"Creating default role: {DEFAULT_ROLE_NAME}")
|
||||
role = Role(role_name=DEFAULT_ROLE_NAME)
|
||||
self.session.add(role)
|
||||
await self.session.flush()
|
||||
await self.session.refresh(role)
|
||||
role = await role_repo.create(DEFAULT_ROLE_NAME)
|
||||
|
||||
return role
|
||||
|
||||
@@ -122,45 +193,3 @@ class UserRepository:
|
||||
|
||||
logger.info(f"Created new user with id: {user.id}, sub: {sub}")
|
||||
return user, True
|
||||
|
||||
|
||||
class RoleRepository:
|
||||
"""Repository for Role-related database operations."""
|
||||
|
||||
def __init__(self, session: AsyncSession):
|
||||
self.session = session
|
||||
|
||||
async def get_by_name(self, role_name: str) -> Optional[Role]:
|
||||
"""Get a role by name."""
|
||||
result = await self.session.execute(
|
||||
select(Role).where(Role.role_name == role_name)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def get_by_id(self, role_id: int) -> Optional[Role]:
|
||||
"""Get a role by ID."""
|
||||
result = await self.session.execute(
|
||||
select(Role).where(Role.id == role_id)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def create(self, role_name: str) -> Role:
|
||||
"""Create a new role."""
|
||||
role = Role(role_name=role_name)
|
||||
self.session.add(role)
|
||||
await self.session.flush()
|
||||
await self.session.refresh(role)
|
||||
return role
|
||||
|
||||
async def ensure_default_roles_exist(self) -> None:
|
||||
"""
|
||||
Ensure default roles exist in the database.
|
||||
Called during application startup.
|
||||
"""
|
||||
default_roles = ["admin", "user", "viewer"]
|
||||
|
||||
for role_name in default_roles:
|
||||
existing = await self.get_by_name(role_name)
|
||||
if existing is None:
|
||||
logger.info(f"Creating default role: {role_name}")
|
||||
await self.create(role_name)
|
||||
|
||||
Reference in New Issue
Block a user