from fastapi import FastAPI, Depends, HTTPException, Request, status from fastapi.staticfiles import StaticFiles from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError from fastapi.responses import RedirectResponse, JSONResponse, HTMLResponse, FileResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials import secrets import logging from backend.database import get_db, engine from backend.models import Base, TimeEntry import backend.crud as crud import backend.schemas as schemas from datetime import datetime, timezone from backend.exceptions import UserNotFoundException, UserAlreadyClockedInException, NoClockInFoundException, UserAlreadyClockedOutException, UserAlreadyExists import bcrypt import os from sqlalchemy import func from dateutil.parser import isoparse # Basic Auth Setup security = HTTPBasic() app = FastAPI() app.mount("/frontend", StaticFiles(directory="frontend"), name="frontend") app.mount("/static", StaticFiles(directory="static"), name="static") Base.metadata.create_all(bind=engine) # Use Uvicorn's logger logger = logging.getLogger("uvicorn") # Retrieve username from environment variables ENV_USERNAME = os.getenv("ADMIN_USERNAME", "admin") # Ensure the admin user exists with the hashed password def ensure_admin_user_exists(db: Session): admin_user = crud.get_user_by_name(db, name=ENV_USERNAME) if not admin_user: # Retrieve password from environment variables env_password = os.getenv("ADMIN_PASSWORD") # Generate a random password if ADMIN_PASSWORD is not set if not env_password: env_password = secrets.token_urlsafe(16) # Random secure password logger.warning(f"\n\nGenerated admin password: \n{env_password}\n\n") # Log this for initial use # Hash the password hashed_env_password = bcrypt.hashpw(env_password.encode('utf-8'), bcrypt.gensalt()) # Create the admin user with the hashed password admin_data = schemas.UserCreate(name=ENV_USERNAME) crud.create_user(db=db, user=admin_data, is_admin=True, hashed_password=hashed_env_password) @app.on_event("startup") def on_startup(): # Ensure the admin user exists on startup db = next(get_db()) ensure_admin_user_exists(db) def verify_password(plain_password, hashed_password): if not hashed_password: return False # If no password is set, it's an automatic failure return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password) def get_current_username(credentials: HTTPBasicCredentials = Depends(security), db: Session = Depends(get_db)): user = crud.get_user_by_name(db, name=credentials.username) # Check if the user is the admin and verify the password if credentials.username == ENV_USERNAME: if not user or not verify_password(credentials.password, user.password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Basic"}, ) else: # Non-admin users should not have a password if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", headers={"WWW-Authenticate": "Basic"}, ) return credentials.username # Custom Exception Handlers @app.exception_handler(UserAlreadyExists) async def user_already_exists_exception_handler(request: Request, exc: UserAlreadyExists): logger.error(f"UserAlreadyExists: {exc.message}") return JSONResponse( status_code=400, content={"message": f"User '{exc.message}' already exists."}, ) @app.exception_handler(UserAlreadyClockedOutException) async def user_already_clocked_out_exception_handler(request: Request, exc: UserAlreadyClockedOutException): logger.error(f"UserAlreadyClockedOutException: {exc.message}") return JSONResponse( status_code=400, content={"message": exc.message}, ) @app.exception_handler(UserNotFoundException) async def user_not_found_exception_handler(request: Request, exc: UserNotFoundException): logger.error(f"UserNotFoundException: {exc.message}") return JSONResponse( status_code=404, content={"message": exc.message}, ) @app.exception_handler(UserAlreadyClockedInException) async def user_already_clocked_in_exception_handler(request: Request, exc: UserAlreadyClockedInException): logger.error(f"UserAlreadyClockedInException: {exc.message}") return JSONResponse( status_code=400, content={"message": exc.message}, ) @app.exception_handler(NoClockInFoundException) async def no_clock_in_found_exception_handler(request: Request, exc: NoClockInFoundException): logger.error(f"NoClockInFoundException: {exc.message}") return JSONResponse( status_code=400, content={"message": exc.message}, ) @app.exception_handler(Exception) async def custom_exception_handler(request: Request, exc: Exception): logger.error(f"Unhandled Exception: {exc}") return JSONResponse( status_code=500, content={"message": "Internal Server Error"}, ) @app.get("/") def read_root(): with open("frontend/index.html") as f: return HTMLResponse(content=f.read()) @app.get("/favicon.ico", include_in_schema=False) async def favicon(): return FileResponse("static/favicon.ico") @app.post("/user/create", response_model=schemas.User) def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)): username_lower = user.name.lower() logger.info(f"create_user: A request has been made for {username_lower}") try: # Try to create the user return crud.create_user(db=db, user=user) except UserAlreadyExists as e: # Handle the specific exception and provide a user-friendly message raise HTTPException(status_code=400, detail=str(e)) except IntegrityError: # Handle the race condition db.rollback() raise HTTPException(status_code=400, detail="User already registered") @app.post("/time/{user}/in") def clock_in(user: str, time: str = None, note: str = None, db: Session = Depends(get_db)): username_lower = user.lower() logger.info(f"clock_in: A request has been made for {username_lower}") # Parse the time parameter if provided if time: try: parsed_time = datetime.fromisoformat(time) if parsed_time.tzinfo is None: parsed_time = parsed_time.replace(tzinfo=timezone.utc) else: parsed_time = parsed_time.astimezone(timezone.utc) except ValueError: raise HTTPException(status_code=400, detail="Invalid time format. Use ISO 8601 format.") else: parsed_time = None # The CRUD function will handle defaulting to current UTC time return crud.clock_in(db=db, user=username_lower, time=parsed_time, note=note) @app.post("/time/{user}/out") def clock_out(user: str, time: str = None, note: str = None, db: Session = Depends(get_db)): username_lower = user.lower() if time: try: parsed_time = datetime.fromisoformat(time) if parsed_time.tzinfo is None: parsed_time = parsed_time.replace(tzinfo=timezone.utc) else: parsed_time = parsed_time.astimezone(timezone.utc) except ValueError: raise HTTPException(status_code=400, detail="Invalid time format. Use ISO 8601 format.") else: parsed_time = None return crud.clock_out(db=db, user=username_lower, time=parsed_time, note=note) @app.get("/time/{user}/recall/payperiod", response_model=schemas.PeriodSummary) def get_pay_period(user: str, db: Session = Depends(get_db)): username_lower = user.lower() return crud.get_time_for_pay_period(db=db, user=username_lower) @app.get("/time/{user}/recall/month", response_model=schemas.PeriodSummary) def get_time_for_month(user: str, db: Session = Depends(get_db)): username_lower = user.lower() period_summary = crud.get_time_for_month(db=db, user=username_lower) return period_summary @app.get("/time/{user}/recall/week", response_model=schemas.PeriodSummary) def get_current_week(user: str, db: Session = Depends(get_db)): username_lower = user.lower() return crud.get_time_for_current_week(db=db, user=username_lower) @app.get("/user/status/{user}") def get_user_status(user: str, db: Session = Depends(get_db)): username_lower = user.lower() try: return crud.get_user_status(db=db, user=username_lower) except UserNotFoundException as e: raise HTTPException(status_code=404, detail=str(e)) @app.delete("/time/{user}/today", response_model=schemas.Message) def delete_today_time_entry(user: str, db: Session = Depends(get_db)): try: crud.delete_today_entry(db, user.lower()) return {"message": f"Today's clock-in and clock-out times for {user} have been deleted."} except UserNotFoundException as e: return {"message": str(e)} except NoClockInFoundException: return {"message": "No clock-in found for today."} @app.get("/time/{user}/is_clocked_in_today") def check_clocked_in_today(user: str, db: Session = Depends(get_db)): if not crud.is_clocked_in_today(db, user.lower()): return {"clocked_in_today": False} return {"clocked_in_today": True} @app.delete("/user/{username}", response_model=schemas.Message) def delete_user(username: str, db: Session = Depends(get_db), admin_username: str = Depends(get_current_username)): # Only the admin should be able to delete a user if admin_username != ENV_USERNAME: raise HTTPException(status_code=403, detail="Only the admin can delete users") username = username.lower() success = crud.delete_user_by_name(db=db, username=username) if success: return {"message": f"User '{username}' has been deleted."} else: raise HTTPException(status_code=404, detail="User not found") @app.get("/edit", response_class=HTMLResponse) def edit_page(): with open("frontend/edit.html") as f: return HTMLResponse(content=f.read()) @app.get("/users") def get_users(db: Session = Depends(get_db)): users = crud.get_users(db=db) return [{"name": user.name} for user in users] @app.post("/time/{user}/edit") def edit_clock_times(user: str, data: schemas.EditClockTimes, db: Session = Depends(get_db)): db_user = crud.get_user_by_name(db, user) if not db_user: raise HTTPException(status_code=404, detail="User not found") try: # Parse the date string into a date object target_date = datetime.strptime(data.date, "%Y-%m-%d").date() except ValueError: raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD.") # Query for the time entry on the specified date time_entry = db.query(TimeEntry).filter( TimeEntry.user_id == db_user.id, func.date(TimeEntry.clock_in) == target_date ).first() if not time_entry: raise HTTPException(status_code=404, detail="No time entry found for this date") if data.clock_in_time: # Parse the ISO datetime string with timezone information time_entry.clock_in = isoparse(data.clock_in_time) if time_entry.clock_in.tzinfo is None: time_entry.clock_in = time_entry.clock_in.replace(tzinfo=timezone.utc) else: time_entry.clock_in = time_entry.clock_in.astimezone(timezone.utc) if data.clock_out_time: time_entry.clock_out = isoparse(data.clock_out_time) if time_entry.clock_out.tzinfo is None: time_entry.clock_out = time_entry.clock_out.replace(tzinfo=timezone.utc) else: time_entry.clock_out = time_entry.clock_out.astimezone(timezone.utc) db.commit() db.refresh(time_entry) return {"message": "Clock times updated successfully"}