Загрузка...

Тг бот для продажи доступа к каналу

Тема в разделе Python создана пользователем 0О0 3 май 2025. 176 просмотров

Загрузка...
  1. 0О0
    0О0 Автор темы 3 май 2025 lolz.live/threads/11111/ - Работайте с лучшими. 629 1 май 2022
    Python
    import asyncio
    from datetime import datetime, timedelta
    import sqlite3
    from aiogram import Bot, Dispatcher, types, F
    from aiogram.filters import Command
    from aiogram.types import (
    InlineKeyboardMarkup,
    InlineKeyboardButton,
    ReplyKeyboardMarkup,
    KeyboardButton
    )
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    import logging
    from functools import lru_cache

    # ===================== НАСТРОЙКИ =====================
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    BOT_TOKEN = "YOUR_BOT_TOKEN" # Замените на ваш токен
    ADMIN_ID = 123456789 # Ваш ID в Telegram
    CRYPTOBOT_TOKEN = "YOUR_CRYPTOBOT_TOKEN" # Для реальных платежей

    bot = Bot(token=BOT_TOKEN)
    dp = Dispatcher()
    scheduler = AsyncIOScheduler()

    # ===================== БАЗА ДАННЫХ =====================
    def init_db():
    """Инициализация базы данных"""
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()

    # Таблицы
    c.execute('''CREATE TABLE IF NOT EXISTS channels
    (id INTEGER PRIMARY KEY AUTOINCREMENT,
    channel_id INTEGER UNIQUE,
    title TEXT,
    description TEXT,
    invite_link TEXT)''')

    c.execute('''CREATE TABLE IF NOT EXISTS subscriptions
    (id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER,
    channel_id INTEGER,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    status TEXT DEFAULT 'active')''')

    c.execute('''CREATE TABLE IF NOT EXISTS payments
    (id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER,
    channel_id INTEGER,
    amount REAL,
    currency TEXT,
    payment_date TIMESTAMP,
    status TEXT)''')

    c.execute('''CREATE TABLE IF NOT EXISTS prices
    (id INTEGER PRIMARY KEY AUTOINCREMENT,
    channel_id INTEGER,
    sub_type TEXT,
    price REAL,
    UNIQUE(channel_id, sub_type))''')

    c.execute('''CREATE TABLE IF NOT EXISTS promo_codes
    (id INTEGER PRIMARY KEY AUTOINCREMENT,
    code TEXT UNIQUE,
    amount REAL,
    uses_left INTEGER,
    expires_at TIMESTAMP)''')

    c.execute('''CREATE TABLE IF NOT EXISTS used_promo_codes
    (id INTEGER PRIMARY KEY AUTOINCREMENT,
    user_id INTEGER,
    promo_id INTEGER,
    used_at TIMESTAMP)''')

    c.execute('''CREATE TABLE IF NOT EXISTS user_balance
    (user_id INTEGER PRIMARY KEY,
    balance REAL)''')

    c.execute('''CREATE TABLE IF NOT EXISTS antispam
    (user_id INTEGER PRIMARY KEY,
    last_action TIMESTAMP)''')

    conn.commit()
    conn.close()

    init_db()

    # ===================== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ =====================
    @lru_cache(maxsize=100)
    def get_channel_price(channel_id: int, sub_type: str) -> float:
    """Получить цену подписки с кэшированием"""
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    c.execute("SELECT price FROM prices WHERE channel_id=? AND sub_type=?", (channel_id, sub_type))
    result = c.fetchone()
    conn.close()
    return result[0] if result else None

    def get_channel_info(channel_id: int) -> dict:
    """Получить информацию о канале"""
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    c.execute("SELECT title, description, invite_link FROM channels WHERE channel_id=?", (channel_id,))
    result = c.fetchone()
    conn.close()
    return {
    'title': result[0],
    'description': result[1],
    'invite_link': result[2]
    } if result else None

    def get_user_balance(user_id: int) -> float:
    """Получить баланс пользователя"""
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    c.execute("SELECT balance FROM user_balance WHERE user_id=?", (user_id,))
    result = c.fetchone()
    conn.close()
    return result[0] if result else 0.0

    def update_user_balance(user_id: int, amount: float):
    """Обновить баланс пользователя"""
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    c.execute("""INSERT OR REPLACE INTO user_balance
    (user_id, balance)
    VALUES (?, COALESCE(
    (SELECT balance FROM user_balance
    WHERE user_id=?), 0) + ?)""",
    (user_id, user_id, amount))
    conn.commit()
    conn.close()

    def get_promo_code(code: str) -> tuple:
    """Проверить промокод"""
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    c.execute("""SELECT id, amount, uses_left
    FROM promo_codes
    WHERE code=? AND (expires_at IS NULL OR expires_at > datetime('now'))""", (code,))
    result = c.fetchone()
    conn.close()
    return result

    def use_promo_code(user_id: int, promo_id: int) -> bool:
    """Использовать промокод"""
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    try:
    c.execute("INSERT INTO used_promo_codes (user_id, promo_id, used_at) VALUES (?, ?, datetime('now'))",
    (user_id, promo_id))
    c.execute("UPDATE promo_codes SET uses_left = uses_left - 1 WHERE id=?", (promo_id,))
    conn.commit()
    return True
    except:
    conn.rollback()
    return False
    finally:
    conn.close()

    def check_antispam(user_id: int) -> bool:
    """Проверить антиспам защиту"""
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    c.execute("SELECT last_action FROM antispam WHERE user_id=?", (user_id,))
    result = c.fetchone()

    now = datetime.now()
    if result:
    last_action = datetime.strptime(result[0], '%Y-%m-%d %H:%M:%S')
    if (now - last_action).seconds < 5:
    conn.close()
    return False

    c.execute("""INSERT OR REPLACE INTO antispam
    (user_id, last_action) VALUES (?, ?)""",
    (user_id, now.strftime('%Y-%m-%d %H:%M:%S')))
    conn.commit()
    conn.close()
    return True

    async def add_subscription(user_id: int, channel_id: int, period: str):
    """Добавить подписку"""
    now = datetime.now()

    if period == "1day":
    end_date = now + timedelta(days=1)
    elif period == "1week":
    end_date = now + timedelta(weeks=1)
    elif period == "1month":
    end_date = now + timedelta(days=30)

    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    c.execute(
    "INSERT INTO subscriptions (user_id, channel_id, start_date, end_date) VALUES (?, ?, ?, ?)",
    (user_id, channel_id, now, end_date)
    )
    conn.commit()
    conn.close()

    async def check_expired_subscriptions():
    """Проверить истекшие подписки"""
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    c.execute("""SELECT user_id, channel_id FROM subscriptions
    WHERE end_date <= datetime('now') AND status='active'""")
    expired = c.fetchall()

    for user_id, channel_id in expired:
    try:
    await bot.ban_chat_member(channel_id, user_id)
    c.execute("""UPDATE subscriptions SET status='expired'
    WHERE user_id=? AND channel_id=?""", (user_id, channel_id))
    await bot.send_message(user_id, f" Ваша подписка в канале истекла!")
    except Exception as e:
    logger.error(f"Error removing user {user_id} from {channel_id}: {e}")

    conn.commit()
    conn.close()

    # ===================== КЛАВИАТУРЫ =====================
    def main_menu() -> ReplyKeyboardMarkup:
    """Главное меню пользователя"""
    return ReplyKeyboardMarkup(
    keyboard=[
    [KeyboardButton(text=" Купить подписку")],
    [KeyboardButton(text=" Мой кабинет")],
    [KeyboardButton(text=" Применить промокод")]
    ],
    resize_keyboard=True
    )

    def admin_menu() -> ReplyKeyboardMarkup:
    """Меню администратора"""
    return ReplyKeyboardMarkup(
    keyboard=[
    [KeyboardButton(text=" Статистика")],
    [KeyboardButton(text=" Управление каналами")],
    [KeyboardButton(text=" Рассылка")],
    [KeyboardButton(text=" Управление промокодами")],
    [KeyboardButton(text=" В главное меню")]
    ],
    resize_keyboard=True
    )

    def channel_list_keyboard(action: str = "show") -> InlineKeyboardMarkup:
    """Список каналов"""
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    c.execute("SELECT channel_id, title FROM channels")
    channels = c.fetchall()
    conn.close()

    buttons = []
    for channel_id, title in channels:
    buttons.append([InlineKeyboardButton(
    text=title,
    callback_data=f"{action}_channel_{channel_id}"
    )])
    buttons.append([InlineKeyboardButton(text=" Назад", callback_data="back_to_main")])

    return InlineKeyboardMarkup(inline_keyboard=buttons)

    def subscription_types_keyboard(channel_id: int) -> InlineKeyboardMarkup:
    """Типы подписок для канала"""
    return InlineKeyboardMarkup(inline_keyboard=[
    [InlineKeyboardButton(text="1 день", callback_data=f"sub_{channel_id}_1day")],
    [InlineKeyboardButton(text="1 неделя", callback_data=f"sub_{channel_id}_1week")],
    [InlineKeyboardButton(text="1 месяц", callback_data=f"sub_{channel_id}_1month")],
    [InlineKeyboardButton(text=" Назад", callback_data="back_to_channels")]
    ])

    def payment_keyboard(pay_url: str, amount: float) -> InlineKeyboardMarkup:
    """Клавиатура оплаты"""
    return InlineKeyboardMarkup(inline_keyboard=[
    [InlineKeyboardButton(text=f" Оплатить {amount}₽", url=pay_url)],
    [InlineKeyboardButton(text=" Я оплатил", callback_data="check_payment")]
    ])

    # ===================== ОСНОВНЫЕ ХЭНДЛЕРЫ =====================
    @dp.message(Command("start"))
    async def start(message: types.Message):
    """Обработчик команды /start"""
    if message.from_user.id == ADMIN_ID:
    await message.answer(" Админ-панель", reply_markup=admin_menu())
    else:
    await message.answer("Добро пожаловать!", reply_markup=main_menu())

    @dp.message(F.text == " Мой кабинет")
    async def my_account(message: types.Message):
    """Личный кабинет пользователя"""
    user_id = message.from_user.id

    # Получаем статистику
    conn = sqlite3.connect('bot.db')
    c = conn.cursor()

    # Дата регистрации
    c.execute("SELECT MIN(start_date) FROM subscriptions WHERE user_id=?", (user_id,))
    reg_date = c.fetchone()[0] or "неизвестно"

    # Общий баланс
    balance = get_user_balance(user_id)

    # Активные подписки
    c.execute("""SELECT c.title, s.end_date
    FROM subscriptions s
    JOIN channels c ON s.channel_id = c.channel_id
    WHERE s.user_id=? AND s.status='active' AND s.end_date >= datetime('now')""",
    (user_id,))
    active_subs = c.fetchall()
    conn.close()

    # Формируем сообщение
    msg = [
    f" <b>Личный кабинет</b>",
    f" Дата регистрации: {reg_date}",
    f" Текущий баланс: {balance}₽",
    f" Активных подписок: {len(active_subs)}",
    "",
    "<b>Ваши подписки:</b>"
    ]

    for title, end_date in active_subs:
    msg.append(f"• {title} (до {end_date})")

    await message.answer("\n".join(msg), parse_mode="HTML", reply_markup=main_menu())

    @dp.message(F.text == " Применить промокод")
    async def apply_promo(message: types.Message):
    """Активация промокода"""
    if not check_antispam(message.from_user.id):
    await message.answer(" Пожалуйста, не так быстро! Подождите 5 секунд")
    return

    await message.answer(
    "Введите промокод:",
    reply_markup=ReplyKeyboardMarkup(
    keyboard=[[KeyboardButton(text=" Отменить")]],
    resize_keyboard=True
    )
    )
    await dp.storage.set_data(message.from_user.id, {"action": "enter_promo"})

    @dp.message(F.text == " Купить подписку")
    async def buy_subscription(message: types.Message):
    """Покупка подписки"""
    await message.answer(
    "Выберите канал:",
    reply_markup=channel_list_keyboard()
    )

    # ===================== ОБРАБОТКА ПРОМОКОДОВ =====================
    @dp.message(F.text)
    async def process_text(message: types.Message):
    """Обработка текстовых сообщений"""
    user_data = await dp.storage.get_data(message.from_user.id)

    # Обработка промокода
    if user_data.get("action") == "enter_promo":
    promo_data = get_promo_code(message.text.upper())
    if not promo_data:
    await message.answer(" Промокод не найден или истек срок действия", reply_markup=main_menu())
    await dp.storage.set_data(message.from_user.id, {})
    return

    promo_id, amount, uses_left = promo_data

    if uses_left is not None and uses_left <= 0:
    await message.answer(" Лимит использования промокода исчерпан", reply_markup=main_menu())
    await dp.storage.set_data(message.from_user.id, {})
    return

    if not use_promo_code(message.from_user.id, promo_id):
    await message.answer(" Ошибка при активации промокода", reply_markup=main_menu())
    await dp.storage.set_data(message.from_user.id, {})
    return

    # Зачисляем средства
    update_user_balance(message.from_user.id, amount)

    await message.answer(
    f" Промокод активирован!\n"
    f"На ваш баланс зачислено: {amount}₽\n"
    f"Текущий баланс: {get_user_balance(message.from_user.id)}₽",
    reply_markup=main_menu()
    )
    await dp.storage.set_data(message.from_user.id, {})

    # ===================== ОБРАБОТКА ПОДПИСОК =====================
    @dp.callback_query(F.data.startswith("sub_"))
    async def process_subscription(callback: types.CallbackQuery):
    """Обработка выбора подписки"""
    _, channel_id, sub_type = callback.data.split("_")
    channel_id = int(channel_id)
    price = get_channel_price(channel_id, sub_type)
    user_id = callback.from_user.id
    balance = get_user_balance(user_id)

    if balance >= price:
    # Оплата с баланса
    update_user_balance(user_id, -price)
    await add_subscription(user_id, channel_id, sub_type)

    channel_info = get_channel_info(channel_id)
    await callback.message.edit_text(
    f" Подписка на канал <b>{channel_info['title']}</b> оформлена!\n"
    f"Списано: {price}₽\n"
    f"Остаток баланса: {balance - price}₽\n\n"
    f"Ссылка для входа: {channel_info['invite_link']}",
    parse_mode="HTML",
    reply_markup=main_menu()
    )
    else:
    # Обычная оплата
    pay_url = f"https://example.com/pay/{user_id}/{channel_id}/{sub_type}" # Замените на реальную платежную систему
    await callback.message.edit_text(
    f" Оплата подписки\n\n"
    f"Канал: {get_channel_info(channel_id)['title']}\n"
    f"Тип: {sub_type}\n"
    f"Стоимость: {price}₽\n"
    f"Ваш баланс: {balance}₽\n"
    f"К оплате: {price - balance}₽",
    reply_markup=payment_keyboard(pay_url, price - balance)
    )

    # ===================== АДМИН-ПАНЕЛЬ =====================
    @dp.message(F.text == " Управление каналами")
    async def manage_channels(message: types.Message):
    """Управление каналами"""
    if message.from_user.id != ADMIN_ID:
    return

    await message.answer(
    "Выберите канал:",
    reply_markup=channel_list_keyboard("manage")
    )

    @dp.callback_query(F.data.startswith("manage_channel_"))
    async def manage_channel(callback: types.CallbackQuery):
    """Управление конкретным каналом"""
    channel_id = int(callback.data.split("_")[2])
    channel_info = get_channel_info(channel_id)

    await callback.message.edit_text(
    f"<b>{channel_info['title']}</b>\n\n"
    f" Описание:\n{channel_info['description']}\n\n"
    f" Ссылка: {channel_info['invite_link']}\n"
    f"ID: {channel_id}",
    parse_mode="HTML",
    reply_markup=InlineKeyboardMarkup(inline_keyboard=[
    [InlineKeyboardButton(text=" Изменить описание", callback_data=f"edit_desc_{channel_id}")],
    [InlineKeyboardButton(text=" Изменить цены", callback_data=f"edit_prices_{channel_id}")],
    [InlineKeyboardButton(text=" Назад", callback_data="back_to_channels")]
    ])
    )

    @dp.callback_query(F.data.startswith("edit_desc_"))
    async def edit_description(callback: types.CallbackQuery):
    """Редактирование описания канала"""
    channel_id = int(callback.data.split("_")[2])
    await callback.message.edit_text(
    "Введите новое описание для канала:",
    reply_markup=InlineKeyboardMarkup(inline_keyboard=[
    [InlineKeyboardButton(text=" Отменить", callback_data=f"manage_channel_{channel_id}")]
    ])
    )
    await dp.storage.set_data(callback.from_user.id, {
    "action": "edit_description",
    "channel_id": channel_id
    })

    @dp.callback_query(F.data.startswith("edit_prices_"))
    async def edit_prices(callback: types.CallbackQuery):
    """Редактирование цен канала"""
    channel_id = int(callback.data.split("_")[2])
    await callback.message.edit_text(
    "Выберите тип подписки для изменения цены:",
    reply_markup=subscription_types_keyboard(channel_id)
    )

    @dp.message(F.text == " Управление промокодами")
    async def manage_promos(message: types.Message):
    """Управление промокодами"""
    if message.from_user.id != ADMIN_ID:
    return

    await message.answer(
    "Выберите действие:",
    reply_markup=InlineKeyboardMarkup(inline_keyboard=[
    [InlineKeyboardButton(text=" Создать промокод", callback_data="create_promo")],
    [InlineKeyboardButton(text=" Выдать промокод", callback_data="give_promo")],
    [InlineKeyboardButton(text=" Статистика промокодов", callback_data="promo_stats")],
    [InlineKeyboardButton(text=" Назад", callback_data="admin_back")]
    ])
    )

    @dp.callback_query(F.data == "create_promo")
    async def create_promo_handler(callback: types.CallbackQuery):
    """Создание промокода"""
    await callback.message.edit_text(
    "Введите данные промокода в формате:\n"
    "<code>КОД СУММА [ЛИМИТ]</code>\n\n"
    "Пример:\n"
    "<code>SUMMER100 100 50</code>\n"
    "<code>WINTER200 200</code> (безлимитный)",
    parse_mode="HTML"
    )
    await dp.storage.set_data(callback.from_user.id, {"action": "create_promo"})

    @dp.callback_query(F.data == "give_promo")
    async def give_promo_handler(callback: types.CallbackQuery):
    """Выдача промокода пользователю"""
    await callback.message.edit_text(
    "Введите ID пользователя и промокод через пробел:\n"
    "<code>123456789 SUMMER100</code>",
    parse_mode="HTML"
    )
    await dp.storage.set_data(callback.from_user.id, {"action": "give_promo"})

    @dp.message(F.text.regexp(r'^[A-Z0-9]+ \d+(?: \d+)?$'))
    async def process_create_promo(message: types.Message):
    """Обработка создания промокода"""
    if message.from_user.id != ADMIN_ID:
    return

    user_data = await dp.storage.get_data(message.from_user.id)
    if user_data.get("action") != "create_promo":
    return

    parts = message.text.split()
    code = parts[0]
    amount = float(parts[1])
    uses_left = int(parts[2]) if len(parts) > 2 else None

    conn = sqlite3.connect('bot.db')
    c = conn.cursor()
    try:
    c.execute(
    "INSERT INTO promo_codes (code, amount, uses_left) VALUES (?, ?, ?)",
    (code, amount, uses_left)
    )
    conn.commit()

    await message.answer(
    f" Промокод создан:\n"
    f"Код: <code>{code}</code>\n"
    f"Сумма: {amount}₽\n"
    f"Лимит: {uses_left or 'безлимитный'}",
    parse_mode="HTML",
    reply_markup=admin_menu()
    )
    except sqlite3.IntegrityError:
    await message.answer(" Промокод уже существует")
    finally:
    conn.close()
    await dp.storage.set_data(message.from_user.id, {})

    @dp.message(F.text.regexp(r'^\d+ [A-Z0-9]+$'))
    async def process_give_promo(message: types.Message):
    """Обработка выдачи промокода"""
    if message.from_user.id != ADMIN_ID:
    return

    user_data = await dp.storage.get_data(message.from_user.id)
    if user_data.get("action") != "give_promo":
    return

    user_id, promo_code = message.text.split()
    user_id = int(user_id)

    promo_data = get_promo_code(promo_code)
    if not promo_data:
    await message.answer(" Промокод не найден")
    return

    promo_id, amount, uses_left = promo_data

    if uses_left is not None and uses_left <= 0:
    await message.answer(" Лимит использования промокода исчерпан")
    return

    if not use_promo_code(user_id, promo_id):
    await message.answer(" Ошибка при активации промокода")
    return

    # Зачисляем средства
    update_user_balance(user_id, amount)

    try:
    await bot.send_message(
    user_id,
    f" Администратор выдал вам промокод!\n"
    f"На ваш баланс зачислено: {amount}₽\n"
    f"Текущий баланс: {get_user_balance(user_id)}₽"
    )
    await message.answer(
    f" Пользователю {user_id} успешно выдан промокод {promo_code} на {amount}₽",
    reply_markup=admin_menu()
    )
    except Exception as e:
    await message.answer(f" Не удалось уведомить пользователя: {str(e)}")

    await dp.storage.set_data(message.from_user.id, {})

    # ===================== ЗАПУСК БОТА =====================
    async def on_startup():
    """Действия при запуске бота"""
    scheduler.add_job(check_expired_subscriptions, 'interval', hours=1)
    scheduler.start()
    logger.info("Бот запущен")

    if __name__ == '__main__':
    dp.startup.register(on_startup)
    asyncio.run(dp.start_polling(bot))
    **Основные функции бота**

    #### **Для пользователей**
    - **Личный кабинет**
    - Дата регистрации
    - Текущий баланс
    - Список активных подписок
    - **Покупка подписок**
    - 1 День (гибкие тарифы)
    - 7 Неделя
    - Месяц
    - **Промокоды**
    - Активация в один клик
    - Уведомления о зачислении
    - **Автоматический доступ**
    - Мгновенное вступление в канал после оплаты
    - Уведомление об окончании подписки

    #### **Для администратора**
    - **Управление каналами**
    - Добавление новых каналов
    - Редактирование описаний
    - Изменение пригласительных ссылок
    - **Управление ценами**
    - Гибкая настройка для каждого канала
    - Разные тарифные планы
    - **Промокоды**
    - Создание (с лимитом или без)
    - Ручная выдача пользователям
    - Статистика использования
    - **Рассылки**
    - Персональные уведомления
    - Массовые рассылки
    - **Аналитика**
    - Общая статистика продаж
    - Активные подписки
    - Доходы

    #### **Системные функции**
    - **Защита от спама**
    - Ограничение 5 сек между действиями
    - Автоматический бан флуда
    - **Автоматизация**
    - Проверка истекших подписок (каждый час)
    - Удаление из каналов при окончании
    - **База данных**
    - SQLite для хранения данных
    - Кэширование часто используемых запросов
    - **Резервное копирование**
    - Рекомендации по настройке бэкапов​


    **Как развернуть?**
    1. Установите зависимости:

    pip install aiogram sqlite3 apscheduler python-dotenv

    2. Настройте конфиг (`.env`):

    BOT_TOKEN=ваш_токен_от_BotFather
    ADMIN_ID=ваш_ID_в_Telegram

    3. Запустите бота:
    ```bash
    python bot.py
    ```​
     
    1. n1s_01
      0О0, опять вайбкодеры окружают
    2. PersonOfInterest
      n1s_01, дада, комменты чатажипити
    3. n1s_01
      PersonOfInterest, да достаточно посомтреть на то что он написал хыхы
  2. cedro
    cedro 4 май 2025 A clear conscience is a soft pillow. 740 25 июн 2020
    сколько комментариев в коде.. боюсь тебе помогал добрый чатгпт
     
Top