295 lines
9.0 KiB
Python
295 lines
9.0 KiB
Python
import asyncio
|
||
import sqlite3
|
||
|
||
from os import getenv
|
||
from enum import Enum
|
||
from dataclasses import dataclass
|
||
|
||
from aiogram import Bot, Dispatcher, F
|
||
from aiogram.filters import Command
|
||
from aiogram.types import Message
|
||
from aiogram.fsm.context import FSMContext
|
||
from aiogram.fsm.state import State, StatesGroup
|
||
|
||
|
||
TOKEN = getenv("BOT_TOKEN")
|
||
ADMIN_ID = int(getenv("ADMIN_ID"))
|
||
|
||
dp = Dispatcher()
|
||
con = sqlite3.connect("/data/wishlist.db")
|
||
cur = con.cursor()
|
||
|
||
|
||
class Form(StatesGroup):
|
||
waiting_for_answer_admin = State()
|
||
waiting_for_answer_verify = State()
|
||
|
||
waiting_for_gift_add = State()
|
||
waiting_for_gift_add_approve = State()
|
||
|
||
waiting_for_gift= State()
|
||
|
||
|
||
class Role(Enum):
|
||
ADMIN = "ADMIN"
|
||
MEMBER = "MEMBER"
|
||
DEFAULT = "DEFAULT"
|
||
|
||
|
||
@dataclass
|
||
class Gift:
|
||
name: str
|
||
description: str | None = None
|
||
link: str | None = None
|
||
picture: str | None = None
|
||
presenter: int | None = None
|
||
|
||
|
||
def is_admin(id: int) -> bool:
|
||
res = cur.execute(f"SELECT role FROM users WHERE id={id}").fetchone()[0]
|
||
|
||
if (res == Role.ADMIN.value):
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
def is_member(id: int) -> bool:
|
||
res = cur.execute(f"SELECT role FROM users WHERE id={id}").fetchone()[0]
|
||
|
||
if (res == Role.MEMBER.value):
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
def gift_text(gift: Gift, id: int) -> str:
|
||
text = f"🎁 Название: {gift.name}"
|
||
|
||
if gift.description is not None:
|
||
text += f"\n📃 Описание: {gift.description}"
|
||
|
||
if gift.link is not None:
|
||
text += f"\n📎 Ссылка: {gift.link}"
|
||
|
||
if is_admin(id):
|
||
return text
|
||
else:
|
||
if gift.presenter is None:
|
||
text += f"\n😭 Дарит: Никто"
|
||
else:
|
||
text += f"\n👤 Дарит: {gift.presenter}"
|
||
return text
|
||
|
||
|
||
@dp.message(Command("start"))
|
||
async def command_start_handler(message: Message) -> None:
|
||
try:
|
||
cur.execute(
|
||
f"INSERT OR IGNORE INTO users(id, name, role) VALUES(?, ?, ?)",
|
||
(message.from_user.id, message.from_user.full_name, Role.DEFAULT.value)
|
||
)
|
||
con.commit()
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
await message.answer("Привет, теперь попроси подтвердить твое участие именинника 🥳")
|
||
|
||
|
||
@dp.message(Command("add_admin"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
if is_admin(message.from_user.id):
|
||
res = cur.execute("SELECT name FROM users WHERE role=?", (Role.MEMBER.value,)).fetchall()
|
||
users = list(map(lambda x: x[0], res))
|
||
text = "Напиши имя пользователя, которого надо сделать админом: \n"
|
||
|
||
for user in users:
|
||
text += f"{user} \n"
|
||
|
||
await message.answer(text)
|
||
await state.set_state(Form.waiting_for_answer_admin)
|
||
|
||
@dp.message(Form.waiting_for_answer_admin)
|
||
async def process_answer(message: Message, state: FSMContext):
|
||
cur.execute("UPDATE users SET role=? WHERE name=?", (Role.ADMIN.value, message.text))
|
||
con.commit()
|
||
|
||
answer = message.text
|
||
await message.answer(f"Теперь {answer} админ!")
|
||
await state.clear()
|
||
|
||
|
||
@dp.message(Command("verify"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
if is_admin(message.from_user.id):
|
||
res = cur.execute("SELECT name FROM users WHERE role=?", (Role.DEFAULT.value,)).fetchall()
|
||
users = list(map(lambda x: x[0], res))
|
||
text = "Напиши имя пользователя, которого верифицировать: \n"
|
||
|
||
for user in users:
|
||
text += f"{user} \n"
|
||
|
||
await message.answer(text)
|
||
await state.set_state(Form.waiting_for_answer_verify)
|
||
|
||
@dp.message(Form.waiting_for_answer_verify)
|
||
async def process_answer(message: Message, state: FSMContext):
|
||
cur.execute("UPDATE users SET role=? WHERE name=?", (Role.MEMBER.value, message.text))
|
||
con.commit()
|
||
|
||
answer = message.text
|
||
await message.answer(f"Теперь {answer} участник вечеринки!")
|
||
await state.clear()
|
||
|
||
|
||
@dp.message(Command("add_gift"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
if is_admin(message.from_user.id):
|
||
text = "Хорошо, отправь мне подарок вот так: \n\nНАЗВАНИЕ \nОПИСАНИЕ \nССЫЛКА \n\nА также можешь прикрепить фотографию!"
|
||
|
||
await message.answer(text)
|
||
await state.set_state(Form.waiting_for_gift_add)
|
||
|
||
@dp.message(Form.waiting_for_gift_add)
|
||
async def process_answer(message: Message, state: FSMContext):
|
||
text = message.text or message.caption
|
||
|
||
if len(text.splitlines()) >= 1:
|
||
gift = Gift(text.splitlines()[0])
|
||
|
||
if len(text.splitlines()) >= 2:
|
||
gift.description = text.splitlines()[1]
|
||
|
||
if len(text.splitlines()) >= 3:
|
||
gift.link = text.splitlines()[2]
|
||
|
||
if message.photo:
|
||
gift.picture = message.photo[-1].file_id
|
||
|
||
|
||
text = gift_text(gift, message.from_user.id)
|
||
if gift.picture is None:
|
||
await message.answer(text)
|
||
try:
|
||
cur.execute(
|
||
f"INSERT OR IGNORE INTO gifts(name, description, link) VALUES(?, ?, ?)",
|
||
(gift.name, gift.description, gift.link)
|
||
)
|
||
con.commit()
|
||
except Exception as e:
|
||
print(e)
|
||
else:
|
||
await message.answer_photo(photo = gift.picture, caption=text)
|
||
try:
|
||
cur.execute(
|
||
f"INSERT OR IGNORE INTO gifts(name, description, link, picture) VALUES(?, ?, ?, ?)",
|
||
(gift.name, gift.description, gift.link, gift.picture)
|
||
)
|
||
con.commit()
|
||
except Exception as e:
|
||
print(e)
|
||
|
||
await message.answer(f'Отлично, подарок добавлен!')
|
||
await state.clear()
|
||
|
||
|
||
@dp.message(Command("gifts"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
res = cur.execute("SELECT * FROM gifts WHERE presenter IS NULL").fetchall()
|
||
gifts = list(map(lambda x: Gift(x[0], x[1], x[2], x[3], x[4]),res))
|
||
|
||
for gift in gifts:
|
||
text = gift_text(gift, message.from_user.id)
|
||
|
||
if gift.picture is None:
|
||
await message.answer(text)
|
||
else:
|
||
await message.answer_photo(photo = gift.picture, caption=text)
|
||
|
||
|
||
@dp.message(Command("my_gift"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
res = cur.execute("SELECT * FROM gifts WHERE presenter=?", (message.from_user.id, )).fetchall()
|
||
gifts = list(map(lambda x: Gift(x[0], x[1], x[2], x[3], x[4]),res))
|
||
|
||
|
||
for gift in gifts:
|
||
text = gift_text(gift, message.from_user.id)
|
||
|
||
if gift.picture is None:
|
||
await message.answer(text)
|
||
else:
|
||
await message.answer_photo(photo = gift.picture, caption=text)
|
||
|
||
|
||
@dp.message(Command("gift"))
|
||
async def command_start_handler(message: Message, state: FSMContext) -> None:
|
||
await state.clear()
|
||
|
||
#if is_member(message.from_user.id):
|
||
res = cur.execute("SELECT * FROM gifts").fetchall()
|
||
gifts = list(map(lambda x: Gift(x[0], x[1], x[2], x[3], x[4]),res))
|
||
|
||
|
||
for gift in gifts:
|
||
text = gift_text(gift, message.from_user.id)
|
||
|
||
if gift.picture is None:
|
||
await message.answer(text)
|
||
else:
|
||
await message.answer_photo(photo = gift.picture, caption=text)
|
||
|
||
await message.answer("Напишите название подарка, который хотите подарить:")
|
||
await state.set_state(Form.waiting_for_gift)
|
||
#else:
|
||
# await message.answer("Вы админ или не участвуете в вечеринке😔")
|
||
|
||
|
||
@dp.message(Form.waiting_for_gift)
|
||
async def process_answer(message: Message, state: FSMContext):
|
||
cur.execute("UPDATE gifts SET presenter=? WHERE name=?", (message.from_user.id, message.text))
|
||
con.commit()
|
||
|
||
await message.answer(f"Теперь {message.text} дарит {message.from_user.full_name}!")
|
||
await state.clear()
|
||
|
||
|
||
@dp.message(Command("reset_gift"))
|
||
async def process_answer(message: Message, state: FSMContext):
|
||
await state.clear()
|
||
|
||
cur.execute("UPDATE gifts SET presenter=? WHERE presenter=?", (None, message.from_user.id))
|
||
con.commit()
|
||
|
||
await message.answer(f"Вы больше ничего не дарите!")
|
||
|
||
|
||
async def main() -> None:
|
||
cur.execute(
|
||
"CREATE TABLE IF NOT EXISTS gifts(name TEXT PRIMARY KEY , description TEXT, link TEXT, picture TEXT, presenter INTEGER REFERENCES users)"
|
||
)
|
||
cur.execute("CREATE TABLE IF NOT EXISTS users(id INTEGER PRIMARY KEY, name TEXT NOT NULL, role TEXT NOT NULL)")
|
||
|
||
cur.execute(
|
||
f"INSERT OR IGNORE INTO users(id, name, role) VALUES(?, ?, ?)",
|
||
(ADMIN_ID, "ADMIN", Role.ADMIN.value)
|
||
)
|
||
con.commit()
|
||
|
||
bot = Bot(token=TOKEN)
|
||
await dp.start_polling(bot)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
|