del fastapi-users

This commit is contained in:
2025-07-16 17:01:10 +03:00
parent db3a2c7010
commit 87b1646f85
8 changed files with 149 additions and 169 deletions

View File

@ -20,7 +20,13 @@ async def load_fernet():
async def login(name: str, password: str):
async with httpx.AsyncClient() as client:
r = await client.post(f'http://127.0.0.1:7535/login?name={name}&password={password}')
r = await client.post(
'http://127.0.0.1:7535/login',
json={
"username": name,
"password": password
}
)
if r.is_success:
return True

View File

@ -11,7 +11,8 @@ dependencies = [
"fastapi (>=0.115.14,<0.116.0)",
"asyncpg (>=0.30.0,<0.31.0)",
"uvicorn (>=0.35.0,<0.36.0)",
"fastapi-users[sqlalchemy] (>=14.0.1,<15.0.0)"
"authx (>=1.4.3,<2.0.0)",
"pydantic[email] (>=2.11.7,<3.0.0)",
]

View File

@ -1,27 +0,0 @@
import uuid
from fastapi_users import FastAPIUsers, models
from fastapi_users.authentication import (
AuthenticationBackend,
BearerTransport,
JWTStrategy,
)
from fastapi_users.jwt import SecretType
from src.database.db import Database
from src.database.user import User
class Transport:
def __init__(self, secret: SecretType, db: Database):
self.secret = secret
self.db = db
self.bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")
self.auth_backend = AuthenticationBackend(
name="jwt",
transport=self.bearer_transport,
get_strategy=self.get_jwt_strategy,
)
self.fastapi_users = FastAPIUsers[User, uuid.UUID](self.db.get_user_manager, [self.auth_backend])
self.current_active_user = self.fastapi_users.current_user(active=True)
def get_jwt_strategy(self) -> JWTStrategy[models.UP, models.ID]:
return JWTStrategy(secret=self.secret, lifetime_seconds=3600)

View File

@ -1,30 +0,0 @@
import uuid
from typing import Optional
from fastapi import Request
from fastapi_users import UUIDIDMixin, BaseUserManager
from fastapi_users.jwt import SecretType
from src.database.user import User
SECRET: SecretType
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
def __init__(self, secret: SecretType):
super().__init__()
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def on_after_register(self, user: User, request: Optional[Request] = None):
print(f"User {user.id} has registered.")
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
):
print(f"Verification requested for user {user.id}. Verification token: {token}")

View File

@ -1,40 +1,78 @@
from collections.abc import AsyncGenerator
from fastapi import Depends
from fastapi_users.db import SQLAlchemyUserDatabase
from fastapi_users.jwt import SecretType
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
import asyncpg
from email_validator import EmailNotValidError, validate_email
from pydantic import EmailStr
from src.auth.user_manager import UserManager
from src.database.user import User, Base
from src.database.user import UserLogin, User
class Database:
def __init__(
self,
admin_name: str,
admin_password: str,
db_user: str,
db_pass: str,
db_host: str,
db_port: int,
db_name: str,
secret: SecretType
db_name: str
):
self.DATABASE_URL = f'postgresql+asyncpg://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}'
self.engine = create_async_engine(self.DATABASE_URL)
self.async_session_maker = async_sessionmaker(self.engine, expire_on_commit=False)
self.secret = secret
self.admin_name = admin_name
self.admin_password = admin_password
self.db_user = db_user
self.db_pass = db_pass
self.db_host = db_host
self.db_name = db_name
async def create_db_and_tables(self):
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def connect(self):
try:
self.conn =await asyncpg.create_pool(
user=self.db_user,
password=self.db_pass,
database=self.db_name,
host=self.db_host
)
except Exception as e:
return e
async def disconnect(self):
await self.conn.close()
async def init(self):
try:
await self.conn.execute(
'''
CREATE TABLE IF NOT EXISTS users (
username TEXT NOT NULL PRIMARY KEY UNIQUE,
password TEXT NOT NULL,
email TEXT NOT NULL,
role TEXT NOT NULL,
out_vpn_access BOOL NOT NULL
)
'''
)
await self.conn.fetch(
'''
INSERT INTO users (username, password, email, role, out_vpn_access)
SELECT $1, $2, 'admin@admin.admin', 'Admin', true
WHERE NOT EXISTS (SELECT 1 FROM users WHERE username = $1)
''',
self.admin_name,
self.admin_password
)
except Exception as e:
return e
async def get_async_session(self) -> AsyncGenerator[AsyncSession, None]:
async with self.async_session_maker() as session:
yield session
async def get_user(self, username: str):
fetch = await self.conn.fetchrow('SELECT * FROM users WHERE username = $1', username)
if fetch is None:
return None
n = list()
for i in fetch.values():
n.append(i)
return n
async def get_user_db(self, session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User)
async def get_user_manager(self, user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
yield UserManager(self.secret, user_db)

View File

@ -1,15 +0,0 @@
import uuid
from fastapi_users import schemas
class UserRead(schemas.BaseUser[uuid.UUID]):
pass
class UserCreate(schemas.BaseUserCreate):
pass
class UserUpdate(schemas.BaseUserUpdate):
pass

View File

@ -1,13 +1,14 @@
from fastapi_users_db_sqlalchemy import SQLAlchemyBaseUserTableUUID
from sqlalchemy.orm import DeclarativeBase, Mapped
from pydantic import BaseModel, EmailStr
from src.database.role import Role
class Base(DeclarativeBase):
pass
class UserLogin(BaseModel):
username: str
password: str
class User(SQLAlchemyBaseUserTableUUID, Base):
username: Mapped[str]
role: Mapped[Role]
vpn_server_access: Mapped[bool]
main_server_access: Mapped[bool]
class User(UserLogin):
username: str
email: EmailStr
role: Role
out_vpn_access: bool
password: str

View File

@ -1,13 +1,13 @@
import asyncio
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.responses import RedirectResponse
from database.db import Database
import uvicorn
from src.database.schemas import *
from src.auth.transport import Transport
from src.database.user import User
from contextlib import asynccontextmanager
app = FastAPI(title='sclient-main-server')
from authx import AuthXConfig, AuthX, RequestToken
from fastapi import Response, FastAPI, Request, HTTPException, Depends
from fastapi.responses import RedirectResponse
import uvicorn
from src.database.db import Database
from src.database.user import User, UserLogin
### Settings
# TODO: Create .env
@ -18,74 +18,80 @@ ADMIN_PASSWORD = 'admin'
DATABASE_USER = 'ADMIN'
DATABASE_PASS = '123123'
DATABASE_HOST = '127.0.0.1'
DATABASE_PORT = 5432
DATABASE_NAME = 'sonoma-db'
SECRET = 'SECRET'
###
@asynccontextmanager
async def lifespan(app: FastAPI):
await db.connect()
await db.init()
yield
await db.disconnect()
db = Database(
ADMIN_NAME,
ADMIN_PASSWORD,
DATABASE_USER,
DATABASE_PASS,
DATABASE_HOST,
DATABASE_PORT,
DATABASE_NAME,
SECRET
)
DATABASE_NAME
)
app = FastAPI(title='sclient-main-server', lifespan=lifespan)
config = AuthXConfig()
config.JWT_SECRET_KEY = SECRET
config.JWT_ACCESS_COOKIE_NAME = "sclient_access_token"
config.JWT_TOKEN_LOCATION = ["cookies"]
security = AuthX(config=config)
security.handle_errors(app)
transport = Transport(SECRET, db)
class App:
def init(self, loop) -> None:
config = uvicorn.Config(
app,
loop=loop,
host='0.0.0.0',
port=PORT
)
server = uvicorn.Server(config)
loop.run_until_complete(server.serve())
@app.get('/')
async def docs(self: Request):
return RedirectResponse(f'{self.url}docs')
@app.get("/authenticated-route")
async def authenticated_route(user: User = Depends(transport.current_active_user)):
return {"message": f"Hello {user.email}!"}
### Auth
@app.post('/login')
async def login(self: Request, credentials: UserLogin, response: Response):
user = await db.get_user(credentials.username)
if user is not None:
if user[1] == credentials.password:
token = security.create_access_token(uid=credentials.username)
response.set_cookie(config.JWT_ACCESS_COOKIE_NAME, token)
return {
"access_token": token
}
raise HTTPException(
401,
detail='Incorrect username or password'
)
###
### Protected
@app.get('/protected/auth', dependencies=[Depends(security.access_token_required)])
async def auth(self: Request):
try:
return {"message": "Hello world !"}
except Exception as e:
raise HTTPException(
401,
detail={"message": str(e)}
) from e
###
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
server = App()
app.include_router(
transport.fastapi_users.get_auth_router(transport.auth_backend), prefix="/auth/jwt", tags=["auth"]
)
app.include_router(
transport.fastapi_users.get_register_router(UserRead, UserCreate),
prefix="/auth",
tags=["auth"],
)
app.include_router(
transport.fastapi_users.get_reset_password_router(),
prefix="/auth",
tags=["auth"],
)
app.include_router(
transport.fastapi_users.get_verify_router(UserRead),
prefix="/auth",
tags=["auth"],
)
app.include_router(
transport.fastapi_users.get_users_router(UserRead, UserUpdate),
prefix="/users",
tags=["users"],
)
loop.run_until_complete(db.create_db_and_tables())
server.init(loop)
uvicorn.run(app, host='0.0.0.0', port=PORT)
if __name__ == '__main__':
main()