1import jwt
2from datetime import datetime, timedelta
3from typing import Optional
4from fastapi import HTTPException, status
5from passlib.context import CryptContext
6
7pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
8SECRET_KEY = "your-256-bit-secret"
9ALGORITHM = "HS256"
10ACCESS_TOKEN_EXPIRE = 30 # minutes
11
12
13class AuthService:
14 """Handles JWT authentication, password hashing,
15 and user session management."""
16
17 def __init__(self, db_session):
18 self.db = db_session
19 self.blacklisted_tokens: set = set()
20
21 def hash_password(self, password: str) -> str:
22 return pwd_context.hash(password)
23
24 def verify_password(self, plain: str, hashed: str) -> bool:
25 return pwd_context.verify(plain, hashed)
26
27 def create_access_token(
28 self, user_id: str, extra: dict = {}
29 ) -> str:
30 expire = datetime.utcnow() + timedelta(
31 minutes=ACCESS_TOKEN_EXPIRE
32 )
33 payload = {
34 "sub": user_id,
35 "exp": expire,
36 "iat": datetime.utcnow(),
37 **extra,
38 }
39 return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
40
41 async def verify_token(self, token: str) -> dict:
42 if token in self.blacklisted_tokens:
43 raise HTTPException(
44 status_code=status.HTTP_401_UNAUTHORIZED,
45 detail="Token has been revoked",
46 )
47 try:
48 payload = jwt.decode(
49 token, SECRET_KEY, algorithms=[ALGORITHM]
50 )
51 except jwt.ExpiredSignatureError:
52 raise HTTPException(
53 status_code=status.HTTP_401_UNAUTHORIZED,
54 detail="Token has expired",
55 )
56 except jwt.InvalidTokenError:
57 raise HTTPException(
58 status_code=status.HTTP_401_UNAUTHORIZED,
59 detail="Invalid token",
60 )
61 user = await self.db.users.find_one(
62 {"_id": payload["sub"]}
63 )
64 if not user:
65 raise HTTPException(
66 status_code=status.HTTP_404_NOT_FOUND,
67 detail="User not found",
68 )
69 return user
70
71 def revoke_token(self, token: str) -> None:
72 self.blacklisted_tokens.add(token)
73
74 async def authenticate_user(
75 self, email: str, password: str
76 ) -> Optional[dict]:
77 user = await self.db.users.find_one({"email": email})
78 if not user:
79 return None
80 if not self.verify_password(password, user["password_hash"]):
81 return None
82 return user