257 lines
8.1 KiB
Python
257 lines
8.1 KiB
Python
import asyncio
|
||
import sqlite3
|
||
|
||
from os import getenv
|
||
from enum import Enum
|
||
from dataclasses import dataclass
|
||
|
||
from aiogram import Bot, Dispatcher, F
|
||
from aiogram.filters import Command
|
||
from aiogram.types import Message
|
||
from aiogram.fsm.context import FSMContext
|
||
from aiogram.fsm.state import State, StatesGroup
|
||
|
||
|
||
TOKEN = getenv("BOT_TOKEN")
|
||
ADMIN_ID = int(getenv("ADMIN_ID"))
|
||
|
||
dp = Dispatcher()
|
||
con = sqlite3.connect("wishlist.db")
|
||
cur = con.cursor()
|
||
|
||
|
||
class Form(StatesGroup):
|
||
waiting_for_answer_admin = State()
|
||
waiting_for_answer_verify = State()
|
||
|
||
waiting_for_gift_add = State()
|
||
waiting_for_gift_add_approve = State()
|
||
|
||
waiting_for_gift= State()
|
||
|
||
|
||
class Role(Enum):
|
||
ADMIN = "ADMIN"
|
||
MEMBER = "MEMBER"
|
||
DEFAULT = "DEFAULT"
|
||
|
||
|
||
@dataclass
|
||
class Gift:
|
||
name: str | None = None
|
||
description: str | None = None
|
||
link: str | None = None
|
||
picture: str | None = None
|
||
presenter: int | None = None
|
||
|
||
|
||
def is_admin(id: int) -> bool:
|
||
res = cur.execute(f"SELECT role FROM users WHERE id={id}").fetchone()[0]
|
||
|
||
if (res == Role.ADMIN.value):
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
def is_member(id: int) -> bool:
|
||
res = cur.execute(f"SELECT role FROM users WHERE id={id}").fetchone()[0]
|
||
|
||
if (res == Role.MEMBER.value):
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
@dp.message(Command("start"))
|
||
async def command_start_handler(message: Message) -> None:
|
||
try:
|
||
cur.execute(
|
||
f"INSERT OR IGNORE INTO users(id, name, role) VALUES(?, ?, ?)",
|
||
(message.from_user.id, message.from_user.full_name, Role.DEFAULT.value)
|
||
)
|
||
con.commit()
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
await message.answer("Привет, теперь попроси подтвердить твое участие именинника 🥳")
|
||
|
||
|
||
@dp.message(Command("add_admin"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
if is_admin(message.from_user.id):
|
||
res = cur.execute("SELECT name FROM users WHERE role=?", (Role.MEMBER.value,)).fetchall()
|
||
users = list(map(lambda x: x[0], res))
|
||
text = "Напиши имя пользователя, которого надо сделать админом: \n"
|
||
|
||
for user in users:
|
||
text += f"{user} \n"
|
||
|
||
await message.answer(text)
|
||
await state.set_state(Form.waiting_for_answer_admin)
|
||
|
||
@dp.message(Form.waiting_for_answer_admin)
|
||
async def process_answer(message: Message, state: FSMContext):
|
||
cur.execute("UPDATE users SET role=? WHERE name=?", (Role.ADMIN.value, message.text))
|
||
con.commit()
|
||
|
||
answer = message.text
|
||
await message.answer(f"Теперь {answer} админ!")
|
||
await state.clear()
|
||
|
||
|
||
@dp.message(Command("verify"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
if is_admin(message.from_user.id):
|
||
res = cur.execute("SELECT name FROM users WHERE role=?", (Role.DEFAULT.value,)).fetchall()
|
||
users = list(map(lambda x: x[0], res))
|
||
text = "Напиши имя пользователя, которого верифицировать: \n"
|
||
|
||
for user in users:
|
||
text += f"{user} \n"
|
||
|
||
await message.answer(text)
|
||
await state.set_state(Form.waiting_for_answer_verify)
|
||
|
||
@dp.message(Form.waiting_for_answer_verify)
|
||
async def process_answer(message: Message, state: FSMContext):
|
||
cur.execute("UPDATE users SET role=? WHERE name=?", (Role.MEMBER.value, message.text))
|
||
con.commit()
|
||
|
||
answer = message.text
|
||
await message.answer(f"Теперь {answer} участник вечеринки!")
|
||
await state.clear()
|
||
|
||
|
||
@dp.message(Command("add_gift"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
if is_admin(message.from_user.id):
|
||
text = "Хорошо, отправь мне подарок вот так: \n\nНАЗВАНИЕ \nОПИСАНИЕ \nССЫЛКА \n\nА также можешь прикрепить фотографию!"
|
||
|
||
await message.answer(text)
|
||
await state.set_state(Form.waiting_for_gift_add)
|
||
|
||
@dp.message(Form.waiting_for_gift_add)
|
||
async def process_answer(message: Message, state: FSMContext):
|
||
gift = Gift()
|
||
|
||
text = message.text or message.caption
|
||
|
||
if len(text.splitlines()) >= 1:
|
||
gift.name = text.splitlines()[0]
|
||
|
||
if len(text.splitlines()) >= 2:
|
||
gift.description = text.splitlines()[1]
|
||
|
||
if len(text.splitlines()) >= 3:
|
||
gift.link = text.splitlines()[2]
|
||
|
||
if message.photo:
|
||
gift.picture = message.photo[-1].file_id
|
||
|
||
|
||
if gift.picture is None:
|
||
await message.answer(f"{gift.name}\n{gift.description}\n{gift.link}")
|
||
try:
|
||
cur.execute(
|
||
f"INSERT OR IGNORE INTO gifts(name, description, link) VALUES(?, ?, ?)",
|
||
(gift.name, gift.description, gift.link)
|
||
)
|
||
con.commit()
|
||
except Exception as e:
|
||
print(e)
|
||
else:
|
||
await message.answer_photo(photo = gift.picture, caption=f"{gift.name}\n{gift.description}\n{gift.link}")
|
||
try:
|
||
cur.execute(
|
||
f"INSERT OR IGNORE INTO gifts(name, description, link, picture) VALUES(?, ?, ?, ?)",
|
||
(gift.name, gift.description, gift.link, gift.picture)
|
||
)
|
||
con.commit()
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
await message.answer(f'Отлично, подарок добавлен!')
|
||
await state.clear()
|
||
|
||
|
||
@dp.message(Command("gifts"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
res = cur.execute("SELECT * FROM gifts").fetchall()
|
||
gifts = list(map(lambda x: Gift(x[0], x[1], x[2], x[3], x[4]),res))
|
||
|
||
|
||
for gift in gifts:
|
||
text = str()
|
||
|
||
if is_admin(message.from_user.id):
|
||
text = f"Название: {gift.name}\nОписание: {gift.description}\nСсылка: {gift.link}"
|
||
else:
|
||
text = f"Название: {gift.name}\nОписание: {gift.description}\nСсылка: {gift.link}\nДарит: {gift.presenter}"
|
||
|
||
if gift.picture is None:
|
||
await message.answer(text)
|
||
else:
|
||
await message.answer_photo(photo = gift.picture, caption=text)
|
||
|
||
|
||
@dp.message(Command("gift"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
if is_member(message.from_user.id):
|
||
res = cur.execute("SELECT * FROM gifts").fetchall()
|
||
gifts = list(map(lambda x: Gift(x[0], x[1], x[2], x[3], x[4]),res))
|
||
|
||
|
||
for gift in gifts:
|
||
text = f"Название: {gift.name}\nОписание: {gift.description}\nСсылка: {gift.link}\nДарит: {gift.presenter}"
|
||
|
||
if gift.picture is None:
|
||
await message.answer(text)
|
||
else:
|
||
await message.answer_photo(photo = gift.picture, caption=text)
|
||
|
||
await message.answer("Напишите название подарка, который хотите подарить:")
|
||
await state.set_state(Form.waiting_for_gift)
|
||
else:
|
||
await message.answer("Вы админ или не участвуете в вечеринке😔")
|
||
|
||
|
||
@dp.message(Form.waiting_for_gift)
|
||
async def process_answer(message: Message, state: FSMContext):
|
||
cur.execute("UPDATE gifts SET presenter=? WHERE name=?", (message.from_user.full_name, message.text))
|
||
con.commit()
|
||
|
||
answer = message.text
|
||
await message.answer(f"Теперь {message.text} дарит {message.from_user.full_name}!")
|
||
await state.clear()
|
||
|
||
|
||
async def main() -> None:
|
||
cur.execute(
|
||
"CREATE TABLE IF NOT EXISTS gifts(name PRIMARY KEY, description TEXT, link TEXT, picture TEXT, presenter TEXT)"
|
||
)
|
||
cur.execute("CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL, role TEXT NOT NULL)")
|
||
|
||
cur.execute(
|
||
f"INSERT OR IGNORE INTO users(id, name, role) VALUES(?, ?, ?)",
|
||
(ADMIN_ID, "ADMIN", Role.ADMIN.value)
|
||
)
|
||
con.commit()
|
||
|
||
bot = Bot(token=TOKEN)
|
||
await dp.start_polling(bot)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
|