from sqlalchemy.orm import Session from typing import Optional, List from app.models import User, UserSession, AuditLog from app.schemas import UserCreate, UserUpdate from app.core.security import get_password_hash, verify_password, generate_session_id from datetime import datetime, timedelta from app.core.config import settings import logging logger = logging.getLogger(__name__) class UserCRUD: @staticmethod def get_by_id(db: Session, user_id: int) -> Optional[User]: return db.query(User).filter(User.id == user_id, User.is_active == True).first() @staticmethod def get_by_email(db: Session, email: str) -> Optional[User]: return db.query(User).filter(User.email == email, User.is_active == True).first() @staticmethod def get_by_username(db: Session, username: str) -> Optional[User]: return db.query(User).filter(User.username == username, User.is_active == True).first() @staticmethod def get_all(db: Session, skip: int = 0, limit: int = 100) -> List[User]: return db.query(User).filter(User.is_active == True).offset(skip).limit(limit).all() @staticmethod def create(db: Session, obj_in: UserCreate) -> User: # بررسی وجود کاربر if UserCRUD.get_by_email(db, obj_in.email): raise ValueError("ایمیل قبلاً ثبت شده است") if UserCRUD.get_by_username(db, obj_in.username): raise ValueError("نام کاربری قبلاً ثبت شده است") # ایجاد کاربر db_user = User( username=obj_in.username, email=obj_in.email, hashed_password=get_password_hash(obj_in.password), full_name=obj_in.full_name, is_active=True, is_superuser=getattr(obj_in, 'is_superuser', False), is_verified=getattr(obj_in, 'is_verified', False) ) db.add(db_user) db.commit() db.refresh(db_user) # ثبت در لاگ audit_log = AuditLog( user_id=db_user.id, event_type="user_register", event_details={"username": db_user.username, "email": db_user.email}, severity="info" ) db.add(audit_log) db.commit() logger.info(f"کاربر جدید ثبت شد: {db_user.username}") return db_user @staticmethod def update(db: Session, db_user: User, obj_in: UserUpdate) -> User: update_data = obj_in.dict(exclude_unset=True) for field, value in update_data.items(): setattr(db_user, field, value) db_user.updated_at = datetime.utcnow() db.commit() db.refresh(db_user) # ثبت در لاگ audit_log = AuditLog( user_id=db_user.id, event_type="user_update", event_details={"updated_fields": list(update_data.keys())}, severity="info" ) db.add(audit_log) db.commit() return db_user @staticmethod def delete(db: Session, user_id: int) -> bool: db_user = UserCRUD.get_by_id(db, user_id) if not db_user: return False # غیرفعال کردن کاربر db_user.is_active = False db_user.updated_at = datetime.utcnow() # غیرفعال کردن سشن‌ها db.query(UserSession).filter(UserSession.user_id == user_id).update( {"is_active": False} ) # ثبت در لاگ audit_log = AuditLog( user_id=user_id, event_type="user_deactivate", event_details={"username": db_user.username}, severity="warning" ) db.add(audit_log) db.commit() logger.warning(f"کاربر غیرفعال شد: {db_user.username}") return True @staticmethod def authenticate(db: Session, username: str, password: str) -> Optional[User]: user = UserCRUD.get_by_username(db, username) if not user: user = UserCRUD.get_by_email(db, username) if not user: return None if not verify_password(password, user.hashed_password): return None if not user.is_active: raise ValueError("حساب کاربری غیرفعال است") # بررسی تلاش‌های ناموفق if user.login_attempts >= settings.MAX_LOGIN_ATTEMPTS: lockout_time = user.updated_at + timedelta(minutes=settings.ACCOUNT_LOCKOUT_MINUTES) if lockout_time > datetime.utcnow(): raise ValueError("حساب کاربری قفل شده است. لطفاً بعداً تلاش کنید") else: # بازنشانی تلاش‌ها user.login_attempts = 0 return user @staticmethod def create_session(db: Session, user: User, ip_address: str, user_agent: str) -> UserSession: # حذف سشن‌های قدیمی old_sessions = db.query(UserSession).filter( UserSession.user_id == user.id, UserSession.is_active == True ).all() for session in old_sessions: session.is_active = False # ایجاد سشن جدید session_id = generate_session_id() expires_at = datetime.utcnow() + timedelta(minutes=settings.SESSION_TIMEOUT_MINUTES) db_session = UserSession( id=session_id, user_id=user.id, ip_address=ip_address, user_agent=user_agent, expires_at=expires_at ) db.add(db_session) # به‌روزرسانی اطلاعات کاربر user.last_login = datetime.utcnow() user.login_attempts = 0 # ثبت در لاگ audit_log = AuditLog( user_id=user.id, event_type="user_login", event_details={"session_id": session_id, "ip_address": ip_address}, severity="info" ) db.add(audit_log) db.commit() logger.info(f"کاربر وارد شد: {user.username} از آدرس IP: {ip_address}") return db_session @staticmethod def validate_session(db: Session, session_id: str) -> Optional[UserSession]: session = db.query(UserSession).filter( UserSession.id == session_id, UserSession.is_active == True, UserSession.expires_at > datetime.utcnow() ).first() if session: # تمدید سشن session.expires_at = datetime.utcnow() + timedelta(minutes=settings.SESSION_TIMEOUT_MINUTES) db.commit() return session @staticmethod def logout(db: Session, session_id: str) -> bool: session = db.query(UserSession).filter( UserSession.id == session_id, UserSession.is_active == True ).first() if session: session.is_active = False # ثبت در لاگ audit_log = AuditLog( user_id=session.user_id, event_type="user_logout", event_details={"session_id": session_id}, severity="info" ) db.add(audit_log) db.commit() logger.info(f"کاربر خارج شد: session_id={session_id}") return True return False @staticmethod def increment_login_attempts(db: Session, username: str) -> None: user = UserCRUD.get_by_username(db, username) if not user: user = UserCRUD.get_by_email(db, username) if user: user.login_attempts += 1 user.updated_at = datetime.utcnow() # ثبت در لاگ audit_log = AuditLog( user_id=user.id, event_type="failed_login", event_details={ "username": username, "attempts": user.login_attempts }, severity="warning" ) db.add(audit_log) db.commit() logger.warning(f"تلاش ناموفق ورود برای کاربر: {username} - تلاش: {user.login_attempts}") user_crud = UserCRUD()