from fastapi import APIRouter, Depends, HTTPException, status, Request from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from datetime import timedelta from typing import Any from app.database import get_db from app.crud import user_crud from app.schemas import UserCreate, UserResponse, Token, LoginRequest from app.core.security import ( create_access_token, create_refresh_token, verify_password, get_password_hash ) from app.core.deps import get_current_user, get_client_info from app.models import User from app.core.config import settings import logging logger = logging.getLogger(__name__) router = APIRouter() @router.post("/register", response_model=UserResponse) async def register( *, db: Session = Depends(get_db), request: Request, user_in: UserCreate, ) -> Any: """ ثبت‌نام کاربر جدید """ try: user = user_crud.create(db, obj_in=user_in) return user except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) except Exception as e: logger.error(f"خطا در ثبت‌نام: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="خطای سرور", ) @router.post("/login", response_model=Token) async def login( *, db: Session = Depends(get_db), request: Request, login_data: LoginRequest, ) -> Any: """ ورود کاربر """ client_info = get_client_info(request) try: # احراز هویت user = user_crud.authenticate( db, username=login_data.username, password=login_data.password ) if not user: # افزایش شماره تلاش‌های ناموفق user_crud.increment_login_attempts(db, login_data.username) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="نام کاربری یا رمز عبور اشتباه است", headers={"WWW-Authenticate": "Bearer"}, ) # ایجاد سشن session = user_crud.create_session( db, user=user, ip_address=client_info["ip_address"], user_agent=client_info["user_agent"] ) # ایجاد توکن‌ها access_token_expires = timedelta( minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES ) if not login_data.remember_me else timedelta(days=7) access_token = create_access_token( data={"sub": str(user.id), "session_id": session.id}, expires_delta=access_token_expires ) refresh_token = create_refresh_token( data={"sub": str(user.id), "session_id": session.id} ) # ذخیره توکن‌ها در سشن session.access_token = access_token session.refresh_token = refresh_token db.commit() return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", "session_id": session.id, "user": user, } except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) except HTTPException: raise except Exception as e: logger.error(f"خطا در ورود: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="خطای سرور", ) @router.post("/refresh", response_model=Token) async def refresh_token( *, db: Session = Depends(get_db), request: Request, refresh_token: str, ) -> Any: """ تمدید توکن دسترسی """ from app.core.security import verify_token client_info = get_client_info(request) try: # بررسی توکن رفرش payload = verify_token(refresh_token) if not payload or payload.get("type") != "refresh": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="توکن رفرش نامعتبر", ) user_id = payload.get("sub") session_id = payload.get("session_id") if not user_id or not session_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="توکن رفرش نامعتبر", ) # بررسی سشن session = user_crud.validate_session(db, session_id) if not session or session.refresh_token != refresh_token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="سشن نامعتبر", ) # دریافت کاربر user = user_crud.get_by_id(db, user_id=int(user_id)) if not user or not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="کاربر غیرفعال", ) # ایجاد توکن جدید new_access_token = create_access_token( data={"sub": str(user.id), "session_id": session.id} ) # تمدید سشن session.access_token = new_access_token db.commit() return { "access_token": new_access_token, "refresh_token": refresh_token, "token_type": "bearer", "session_id": session.id, "user": user, } except HTTPException: raise except Exception as e: logger.error(f"خطا در تمدید توکن: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="خطای سرور", ) @router.post("/logout") async def logout( *, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), session_id: str, ) -> Any: """ خروج کاربر """ success = user_crud.logout(db, session_id) if not success: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="سشن یافت نشد", ) return {"message": "با موفقیت خارج شدید"} @router.get("/me", response_model=UserResponse) async def get_current_user_info( current_user: User = Depends(get_current_user), ) -> Any: """ دریافت اطلاعات کاربر جاری """ return current_user @router.put("/me", response_model=UserResponse) async def update_current_user( *, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), user_update: dict, ) -> Any: """ به‌روزرسانی اطلاعات کاربر جاری """ from app.schemas import UserUpdate try: update_data = UserUpdate(**user_update) user = user_crud.update(db, db_user=current_user, obj_in=update_data) return user except Exception as e: logger.error(f"خطا در به‌روزرسانی کاربر: {e}") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) @router.post("/change-password") async def change_password( *, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), old_password: str, new_password: str, ) -> Any: """ تغییر رمز عبور """ from app.core.security import verify_password, get_password_hash from app.schemas import UserCreate try: # بررسی رمز قدیمی if not verify_password(old_password, current_user.hashed_password): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="رمز عبور فعلی اشتباه است", ) # بررسی رمز جدید try: UserCreate(password=new_password) # اعتبارسنجی except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) # تغییر رمز current_user.hashed_password = get_password_hash(new_password) db.commit() # لاگ کردن from app.models import AuditLog audit_log = AuditLog( user_id=current_user.id, event_type="password_change", event_details={}, severity="info" ) db.add(audit_log) db.commit() return {"message": "رمز عبور با موفقیت تغییر کرد"} except HTTPException: raise except Exception as e: logger.error(f"خطا در تغییر رمز عبور: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="خطای سرور", )