import asyncpg from email_validator import EmailNotValidError, validate_email from pydantic import EmailStr from src.database.user import UserLogin, User class Database: def __init__( self, admin_name: str, admin_password: str, db_user: str, db_pass: str, db_host: str, db_name: str ): self.admin_name = admin_name self.admin_password = admin_password self.db_user = db_user self.db_pass = db_pass self.db_host = db_host self.db_name = db_name async def connect(self): try: self.conn =await asyncpg.create_pool( user=self.db_user, password=self.db_pass, database=self.db_name, host=self.db_host ) except Exception as e: return e async def disconnect(self): await self.conn.close() async def init(self): try: await self.conn.execute( ''' CREATE TABLE IF NOT EXISTS users ( username TEXT NOT NULL PRIMARY KEY UNIQUE, password TEXT NOT NULL, email TEXT NOT NULL, role TEXT NOT NULL, out_vpn_access BOOL NOT NULL, docker_access BOOL NOT NULL, git_access BOOL NOT NULL ) ''' ) await self.conn.fetch( ''' INSERT INTO users (username, password, email, role, out_vpn_access, docker_access, git_access) SELECT $1, $2, 'admin@admin.admin', 'Admin', true, true, true WHERE NOT EXISTS (SELECT 1 FROM users WHERE username = $1) ''', self.admin_name, self.admin_password ) except Exception as e: return e async def get_user(self, username: str): fetch = await self.conn.fetchrow('SELECT * FROM users WHERE username = $1', username) if fetch is None: return None n = list() for i in fetch.values(): n.append(i) return n