Загрузка...

Телеграм бот для автопостинга

Тема в разделе Python создана пользователем n1s_01 20 апр 2025. (поднята 5 май 2025) 204 просмотра

Загрузка...
  1. n1s_01
    n1s_01 Автор темы 20 апр 2025 :coder: n1s-01.click :coder: lolz.live/threads/8514735/ :coder: 471 15 май 2019
    Телеграм бот что бы автоматически подставлять вот такие теги
    [IMG]
    ⁡bot.py

    Python
    import asyncio
    import logging
    import re
    import sqlite3
    from datetime import datetime, timedelta, timezone
    from aiogram import Bot, Dispatcher, Router, F
    from aiogram.filters import Command
    from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
    from aiogram.fsm.context import FSMContext
    from aiogram.fsm.state import State, StatesGroup
    from aiogram.fsm.storage.memory import MemoryStorage
    from config import TOKEN
    from aiogram.client.default import DefaultBotProperties

    logging.basicConfig(level=logging.INFO)
    router = Router()
    msk_tz = timezone(timedelta(hours=3))

    def init_db():
    with sqlite3.connect("bot.db") as conn:
    c = conn.cursor()
    c.execute("""CREATE TABLE IF NOT EXISTS admins (
    user_id INTEGER PRIMARY KEY
    )""")
    c.execute("""CREATE TABLE IF NOT EXISTS channels (
    channel_id INTEGER PRIMARY KEY,
    channel_name TEXT
    )""")
    c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='settings';")
    if c.fetchone():
    try:
    c.execute("SELECT channel_id, key, value FROM settings LIMIT 1")
    except sqlite3.OperationalError:
    logging.info("Dropping old 'settings' table schema.")
    c.execute("DROP TABLE settings")

    c.execute("""CREATE TABLE IF NOT EXISTS settings (
    channel_id INTEGER,
    key TEXT,
    value TEXT,
    PRIMARY KEY (channel_id, key)
    )""")

    c.execute("PRAGMA table_info(delayed_messages)")
    columns = [col[1] for col in c.fetchall()]
    if 'channel_id' not in columns:
    logging.info("Adding 'channel_id' column to 'delayed_messages' table.")
    c.execute("ALTER TABLE delayed_messages ADD COLUMN channel_id INTEGER")

    conn.commit()

    def is_admin(user_id: int) -> bool:
    with sqlite3.connect("bot.db") as conn:
    c = conn.cursor()
    c.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
    return c.fetchone() is not None

    def add_admin(user_id: int):
    with sqlite3.connect("bot.db") as conn:
    c = conn.cursor()
    c.execute("INSERT OR IGNORE INTO admins (user_id) VALUES (?)", (user_id,))
    conn.commit()

    def get_admin_count() -> int:
    with sqlite3.connect("bot.db") as conn:
    c = conn.cursor()
    c.execute("SELECT COUNT(*) FROM admins")
    return c.fetchone()[0]

    def add_channel(channel_id: int, channel_name: str):
    with sqlite3.connect("bot.db") as conn:
    c = conn.cursor()
    c.execute("INSERT OR REPLACE INTO channels (channel_id, channel_name) VALUES (?, ?)", (channel_id, channel_name))
    c.execute("INSERT OR IGNORE INTO settings (channel_id, key, value) VALUES (?, ?, ?)",
    (channel_id, "footer", "<b>Разработано lzt.bet</b>"))
    conn.commit()

    def remove_channel(channel_id: int):
    with sqlite3.connect("bot.db") as conn:
    c = conn.cursor()
    c.execute("DELETE FROM channels WHERE channel_id = ?", (channel_id,))
    c.execute("DELETE FROM settings WHERE channel_id = ?", (channel_id,))
    c.execute("DELETE FROM delayed_messages WHERE channel_id = ?", (channel_id,))
    conn.commit()

    def list_channels() -> list[sqlite3.Row]:
    with sqlite3.connect("bot.db") as conn:
    conn.row_factory = sqlite3.Row
    c = conn.cursor()
    c.execute("SELECT channel_id, channel_name FROM channels ORDER BY channel_name")
    return c.fetchall()

    def get_footer(channel_id: int) -> str:
    with sqlite3.connect("bot.db") as conn:
    c = conn.cursor()
    c.execute("INSERT OR IGNORE INTO settings (channel_id, key, value) VALUES (?, ?, ?)",
    (channel_id, "footer", "<b>Разработано lzt.bet</b>"))
    c.execute("SELECT value FROM settings WHERE channel_id = ? AND key = ?", (channel_id, "footer"))
    result = c.fetchone()
    return result[0] if result else "<b>Разработано lzt.bet</b>"

    def save_footer(channel_id: int, footer: str):
    with sqlite3.connect("bot.db") as conn:
    c = conn.cursor()
    c.execute("INSERT OR REPLACE INTO settings (channel_id, key, value) VALUES (?, ?, ?)",
    (channel_id, "footer", footer))
    conn.commit()

    def save_delayed_message(channel_id: int, content: str, publish_time: datetime, message_type: str, file_id: str = None):
    with sqlite3.connect("bot.db") as conn:
    c = conn.cursor()
    c.execute("INSERT INTO delayed_messages (chat_id, content, publish_time, message_type, file_id, channel_id) VALUES (?, ?, ?, ?, ?, ?)",
    (channel_id, content, publish_time.isoformat(), message_type, file_id, channel_id))
    conn.commit()

    def delete_delayed_message(message_id: int):
    with sqlite3.connect("bot.db") as conn:
    c = conn.cursor()
    c.execute("DELETE FROM delayed_messages WHERE id = ?", (message_id,))
    conn.commit()

    def get_delayed_messages() -> list[sqlite3.Row]:
    with sqlite3.connect("bot.db") as conn:
    conn.row_factory = sqlite3.Row
    c = conn.cursor()
    c.execute("PRAGMA table_info(delayed_messages)")
    columns = [col[1] for col in c.fetchall()]
    if 'channel_id' in columns:
    c.execute("SELECT id, chat_id, content, publish_time, message_type, file_id, channel_id FROM delayed_messages")
    else:
    logging.warning("Delayed messages table missing 'channel_id'. Selecting without it.")
    c.execute("SELECT id, chat_id, content, publish_time, message_type, file_id FROM delayed_messages")
    return c.fetchall()

    class AdminStates(StatesGroup):
    adding_channel_id = State()
    adding_channel_name = State()

    class MessageStates(StatesGroup):
    selecting_channel = State()
    waiting_for_message = State()

    class DelayedStates(StatesGroup):
    selecting_channel = State()
    waiting_for_time = State()
    waiting_for_message = State()

    class SettingsStates(StatesGroup):
    selecting_channel = State()
    waiting_for_footer = State()

    def get_main_menu_keyboard():
    buttons = []
    channels = list_channels()
    row1 = []
    if channels:
    row1.append(InlineKeyboardButton(text=" Отправить", callback_data="message"))
    row1.append(InlineKeyboardButton(text=" Отложенное", callback_data="delayed"))
    row1.append(InlineKeyboardButton(text=" Настроить", callback_data="settings"))
    if row1:
    buttons.append(row1)

    row2 = []
    row2.append(InlineKeyboardButton(text=" Добавить канал", callback_data="add_channel"))
    if channels:
    row2.append(InlineKeyboardButton(text=" Удалить канал", callback_data="remove_channel"))
    if row2:
    buttons.append(row2)

    return InlineKeyboardMarkup(inline_keyboard=buttons)

    def get_channel_selection_keyboard(callback_prefix: str, buttons_per_row: int = 2) -> InlineKeyboardMarkup:
    channels = list_channels()
    buttons = []
    row = []
    for i, channel in enumerate(channels):
    row.append(InlineKeyboardButton(
    text=channel["channel_name"],
    callback_data=f"{callback_prefix}:{channel['channel_id']}"
    ))
    if len(row) == buttons_per_row or i == len(channels) - 1:
    buttons.append(row)
    row = []

    buttons.append([InlineKeyboardButton(text=" Отмена", callback_data="cancel_channel_select")])
    return InlineKeyboardMarkup(inline_keyboard=buttons)

    def get_channel_removal_keyboard(buttons_per_row: int = 2) -> InlineKeyboardMarkup:
    channels = list_channels()
    buttons = []
    row = []
    for i, channel in enumerate(channels):
    row.append(InlineKeyboardButton(
    text=f"Удалить {channel['channel_name']}",
    callback_data=f"confirm_remove:{channel['channel_id']}"
    ))
    if len(row) == buttons_per_row or i == len(channels) - 1:
    buttons.append(row)
    row = []

    buttons.append([InlineKeyboardButton(text=" Отмена", callback_data="cancel_channel_select")])
    return InlineKeyboardMarkup(inline_keyboard=buttons)

    cancel_keyboard = InlineKeyboardMarkup(inline_keyboard=[
    [InlineKeyboardButton(text=" Отмена", callback_data="cancel_input")]
    ])

    async def send_message(bot: Bot, chat_id: int, content: str, message_type: str, file_id: str = None):
    try:
    if message_type == "text":
    await bot.send_message(chat_id, content, parse_mode="HTML")
    elif message_type == "photo":
    await bot.send_photo(chat_id, file_id, caption=content, parse_mode="HTML")
    elif message_type == "video":
    await bot.send_video(chat_id, file_id, caption=content, parse_mode="HTML")
    elif message_type == "document":
    await bot.send_document(chat_id, file_id, caption=content, parse_mode="HTML")
    logging.info(f"Message sent to {chat_id} (type: {message_type})")
    except Exception as e:
    logging.error(f"Failed to send message to {chat_id}: {e}")
    raise

    @router.message(Command("start"))
    async def start_command(message: Message):
    user_id = message.from_user.id
    if get_admin_count() == 0:
    add_admin(user_id)
    await message.answer(
    " Вы назначены администратором бота!\n\n"
    "Теперь вы можете управлять каналами и настройками.\n"
    "Добавьте свой первый канал для постинга.",
    reply_markup=get_main_menu_keyboard()
    )
    elif is_admin(user_id):
    await message.answer(
    " Бот-редактор каналов\n\n"
    "Выберите действие:",
    reply_markup=get_main_menu_keyboard()
    )
    else:
    await message.answer(" У вас нет доступа к этому боту.")

    @router.callback_query(F.data == "add_channel")
    async def handle_add_channel_button(callback_query: CallbackQuery, state: FSMContext):
    if not is_admin(callback_query.from_user.id):
    return await callback_query.answer(" Доступ запрещен", show_alert=True)
    await callback_query.answer()
    await callback_query.message.edit_text(
    "Введите ID канала (например, -100123456789) или его @username.\n"
    "Убедитесь, что бот добавлен в этот канал как администратор с правами на публикацию.",
    reply_markup=cancel_keyboard
    )
    await state.set_state(AdminStates.adding_channel_id)

    @router.message(AdminStates.adding_channel_id)
    async def process_add_channel_id(message: Message, state: FSMContext):
    if not is_admin(message.from_user.id): return

    channel_id_str = message.text.strip()
    channel_id = None

    if channel_id_str.startswith("-") and channel_id_str[1:].isdigit():
    try: channel_id = int(channel_id_str)
    except ValueError: pass
    elif channel_id_str.startswith("@"):
    channel_id = channel_id_str
    else:
    await message.answer(" Неверный формат ID или @username канала. ID должен начинаться с -100..., username с @.")
    return

    try:
    chat = await message.bot.get_chat(chat_id=channel_id)
    actual_channel_id = chat.id
    channel_display_name = chat.title or chat.username

    existing_channels = list_channels()
    if any(ch['channel_id'] == actual_channel_id for ch in existing_channels):
    await message.answer(f" Канал '{channel_display_name}' уже добавлен.", reply_markup=get_main_menu_keyboard())
    await state.clear()
    return

    await state.update_data(channel_id=actual_channel_id, tentative_channel_name=channel_display_name)
    await message.answer(f"Канал '{channel_display_name}' найден. \nТеперь введите краткое имя для этого канала (для кнопок):", reply_markup=cancel_keyboard)
    await state.set_state(AdminStates.adding_channel_name)
    except Exception as e:
    logging.error(f"Error checking channel {channel_id_str}: {e}")
    await message.answer(f" Не удалось получить доступ к каналу '{channel_id_str}'.\nОшибка: {e}\nУбедитесь, что ID/имя верны и бот добавлен как администратор с правами публикации.")

    @router.message(AdminStates.adding_channel_name)
    async def process_add_channel_name(message: Message, state: FSMContext):
    if not is_admin(message.from_user.id): return

    channel_name = message.text.strip()
    if not channel_name:
    await message.answer(" Имя не может быть пустым.")
    return

    data = await state.get_data()
    channel_id = data.get("channel_id")
    if not channel_id:
    await message.answer("Произошла ошибка, ID канала не найден. Попробуйте добавить заново.", reply_markup=get_main_menu_keyboard())
    await state.clear()
    return

    add_channel(channel_id, channel_name)
    await message.answer(f" Канал '{channel_name}' (ID: {channel_id}) успешно добавлен!", reply_markup=get_main_menu_keyboard())
    await state.clear()

    @router.callback_query(F.data == "remove_channel")
    async def handle_remove_channel_button(callback_query: CallbackQuery, state: FSMContext):
    if not is_admin(callback_query.from_user.id):
    return await callback_query.answer(" Доступ запрещен", show_alert=True)

    await callback_query.answer()
    channels = list_channels()
    if not channels:
    await callback_query.message.edit_text("Нет добавленных каналов для удаления.", reply_markup=get_main_menu_keyboard())
    return

    await callback_query.message.edit_text(
    "Выберите канал для удаления:",
    reply_markup=get_channel_removal_keyboard()
    )

    @router.callback_query(F.data.startswith("confirm_remove:"))
    async def handle_confirm_remove_channel(callback_query: CallbackQuery, state: FSMContext):
    if not is_admin(callback_query.from_user.id):
    return await callback_query.answer(" Доступ запрещен", show_alert=True)

    try:
    channel_id_to_remove = int(callback_query.data.split(":")[1])
    except (IndexError, ValueError):
    await callback_query.answer("Ошибка ID канала.", show_alert=True)
    await callback_query.message.edit_text("Произошла ошибка.", reply_markup=get_main_menu_keyboard())
    return

    channel_name = f"ID {channel_id_to_remove}"
    for ch in list_channels():
    if ch['channel_id'] == channel_id_to_remove:
    channel_name = ch['channel_name']
    break

    remove_channel(channel_id_to_remove)
    await callback_query.answer(f"Канал {channel_name} удален.")
    await callback_query.message.edit_text(
    f" Канал '{channel_name}' удален.",
    reply_markup=get_main_menu_keyboard()
    )

    @router.callback_query(F.data == "message")
    async def handle_message_button(callback_query: CallbackQuery, state: FSMContext):
    if not is_admin(callback_query.from_user.id): return await callback_query.answer(" Доступ запрещен", show_alert=True)
    await callback_query.answer()
    channels = list_channels()
    if not channels:
    await callback_query.message.edit_text("Сначала добавьте канал с помощью кнопки ' Добавить канал'.", reply_markup=get_main_menu_keyboard())
    return

    await callback_query.message.edit_text(
    "Выберите канал для отправки сообщения:",
    reply_markup=get_channel_selection_keyboard("msg_ch")
    )
    await state.set_state(MessageStates.selecting_channel)

    @router.callback_query(MessageStates.selecting_channel, F.data.startswith("msg_ch:"))
    async def handle_message_channel_selected(callback_query: CallbackQuery, state: FSMContext):
    try:
    channel_id = int(callback_query.data.split(":")[1])
    await state.update_data(selected_channel_id=channel_id)
    await callback_query.answer()
    await callback_query.message.edit_text(
    " Отправьте сообщение (поддерживается HTML, фото, видео, файлы):",
    reply_markup=cancel_keyboard
    )
    await state.set_state(MessageStates.waiting_for_message)
    except (IndexError, ValueError):
    await callback_query.answer("Ошибка выбора канала.", show_alert=True)
    await callback_query.message.edit_text("Произошла ошибка.", reply_markup=get_main_menu_keyboard())
    await state.clear()

    @router.message(MessageStates.waiting_for_message)
    async def process_message(message: Message, state: FSMContext):
    data = await state.get_data()
    channel_id = data.get("selected_channel_id")
    if not channel_id:
    await message.answer(" Ошибка: Канал не выбран. Начните заново.", reply_markup=get_main_menu_keyboard())
    await state.clear()
    return

    footer = get_footer(channel_id)
    content = message.html_text if message.text else message.caption or ""
    content = f"{content}\n\n{footer}" if content else footer
    try:
    file_id = None
    message_type = "text"
    if message.photo:
    file_id = message.photo[-1].file_id
    message_type = "photo"
    elif message.video:
    file_id = message.video.file_id
    message_type = "video"
    elif message.document:
    file_id = message.document.file_id
    message_type = "document"

    await send_message(
    bot=message.bot,
    chat_id=channel_id,
    content=content,
    message_type=message_type,
    file_id=file_id
    )
    await message.answer(" Сообщение опубликовано", reply_markup=get_main_menu_keyboard())
    except Exception as e:
    logging.error(f"Error sending message to {channel_id}: {e}")
    await message.answer(f" Ошибка при отправке в канал {channel_id}: {str(e)}", reply_markup=get_main_menu_keyboard())
    finally:
    await state.clear()

    @router.callback_query(F.data == "delayed")
    async def handle_delayed_button(callback_query: CallbackQuery, state: FSMContext):
    if not is_admin(callback_query.from_user.id): return await callback_query.answer(" Доступ запрещен", show_alert=True)
    await callback_query.answer()
    channels = list_channels()
    if not channels:
    await callback_query.message.edit_text("Сначала добавьте канал с помощью кнопки ' Добавить канал'.", reply_markup=get_main_menu_keyboard())
    return

    await callback_query.message.edit_text(
    "Выберите канал для отложенного сообщения:",
    reply_markup=get_channel_selection_keyboard("del_ch")
    )
    await state.set_state(DelayedStates.selecting_channel)

    @router.callback_query(DelayedStates.selecting_channel, F.data.startswith("del_ch:"))
    async def handle_delayed_channel_selected(callback_query: CallbackQuery, state: FSMContext):
    try:
    channel_id = int(callback_query.data.split(":")[1])
    await state.update_data(selected_channel_id=channel_id)
    await callback_query.answer()
    now_utc = datetime.now(timezone.utc)
    now_msk = now_utc.astimezone(msk_tz)
    time_format = "%H:%M %d/%m/%Y"
    time_30m = (now_msk + timedelta(minutes=30)).strftime(time_format)
    time_6h = (now_msk + timedelta(hours=6)).strftime(time_format)
    time_24h = (now_msk + timedelta(hours=24)).strftime(time_format)

    prompt_text = (
    "Отправьте время отправки (МСК)\n"
    "Например:\n"
    f"<code>{time_30m}</code> - через 30 минут\n"
    f"<code>{time_6h}</code> - через 6 часов\n"
    f"<code>{time_24h}</code> - через 24 часа"
    )

    await callback_query.message.edit_text(
    prompt_text,
    parse_mode="HTML",
    reply_markup=cancel_keyboard
    )
    await state.set_state(DelayedStates.waiting_for_time)
    except (IndexError, ValueError):
    await callback_query.answer("Ошибка выбора канала.", show_alert=True)
    await callback_query.message.edit_text("Произошла ошибка.", reply_markup=get_main_menu_keyboard())
    await state.clear()

    @router.message(DelayedStates.waiting_for_time)
    async def process_time(message: Message, state: FSMContext):
    if not re.match(r"^\d{2}:\d{2} \d{2}/\d{2}/\d{4}$", message.text):
    now_utc = datetime.now(timezone.utc)
    now_msk = now_utc.astimezone(msk_tz)
    time_format = "%H:%M %d/%m/%Y"
    time_30m = (now_msk + timedelta(minutes=30)).strftime(time_format)
    time_6h = (now_msk + timedelta(hours=6)).strftime(time_format)
    time_24h = (now_msk + timedelta(hours=24)).strftime(time_format)
    error_prompt = (
    " Неверный формат. Используйте ЧЧ:ММ ДД/ММ/ГГГГ (МСК)\n"
    "Например:\n"
    f"<code>{time_30m}</code> - через 30 минут\n"
    f"<code>{time_6h}</code> - через 6 часов\n"
    f"<code>{time_24h}</code> - через 24 часа"
    )
    await message.answer(error_prompt, parse_mode="HTML")
    return
    try:
    publish_time_naive = datetime.strptime(message.text, "%H:%M %d/%m/%Y")
    now_msk_naive = datetime.now(timezone.utc).astimezone(msk_tz).replace(tzinfo=None)

    if publish_time_naive <= now_msk_naive:
    await message.answer(" Время должно быть в будущем (МСК)")
    return

    await state.update_data(publish_time=publish_time_naive)
    await message.answer(" Отправьте сообщение для отложенной публикации:", reply_markup=cancel_keyboard)
    await state.set_state(DelayedStates.waiting_for_message)
    except ValueError:
    await message.answer(" Неверная дата или время.")

    @router.message(DelayedStates.waiting_for_message)
    async def process_delayed_message(message: Message, state: FSMContext):
    data = await state.get_data()
    channel_id = data.get("selected_channel_id")
    publish_time = data.get("publish_time")

    if not channel_id or not publish_time:
    await message.answer(" Ошибка: Канал или время не установлены. Начните заново.", reply_markup=get_main_menu_keyboard())
    await state.clear()
    return

    footer = get_footer(channel_id)
    content = message.html_text if message.text else message.caption or ""
    content = f"{content}\n\n{footer}" if content else footer
    try:
    file_id = None
    message_type = "text"
    if message.photo:
    file_id = message.photo[-1].file_id
    message_type = "photo"
    elif message.video:
    file_id = message.video.file_id
    message_type = "video"
    elif message.document:
    file_id = message.document.file_id
    message_type = "document"

    save_delayed_message(channel_id, content, publish_time, message_type, file_id)
    await message.answer(
    f" Запланировано на {publish_time.strftime('%Y/%m/%d %H:%M')} для канала.",
    reply_markup=get_main_menu_keyboard()
    )
    except Exception as e:
    logging.error(f"Error scheduling message for {channel_id}: {e}")
    await message.answer(f" Ошибка при планировании: {str(e)}", reply_markup=get_main_menu_keyboard())
    finally:
    await state.clear()

    @router.callback_query(F.data == "settings")
    async def handle_settings_button(callback_query: CallbackQuery, state: FSMContext):
    if not is_admin(callback_query.from_user.id): return await callback_query.answer(" Доступ запрещен", show_alert=True)
    await callback_query.answer()
    channels = list_channels()
    if not channels:
    await callback_query.message.edit_text("Сначала добавьте канал с помощью кнопки ' Добавить канал'.", reply_markup=get_main_menu_keyboard())
    return

    await callback_query.message.edit_text(
    "Выберите канал для настройки футера:",
    reply_markup=get_channel_selection_keyboard("set_ch")
    )
    await state.set_state(SettingsStates.selecting_channel)

    @router.callback_query(SettingsStates.selecting_channel, F.data.startswith("set_ch:"))
    async def handle_settings_channel_selected(callback_query: CallbackQuery, state: FSMContext):
    try:
    channel_id = int(callback_query.data.split(":")[1])
    await state.update_data(selected_channel_id=channel_id)
    footer = get_footer(channel_id)
    await callback_query.answer()
    await callback_query.message.edit_text(
    f" Текущий футер для канала:\n{footer}\n\n"
    "Отправьте новый текст футера (поддерживается HTML):",
    parse_mode="HTML",
    reply_markup=cancel_keyboard
    )
    await state.set_state(SettingsStates.waiting_for_footer)
    except (IndexError, ValueError):
    await callback_query.answer("Ошибка выбора канала.", show_alert=True)
    await callback_query.message.edit_text("Произошла ошибка.", reply_markup=get_main_menu_keyboard())
    await state.clear()

    @router.message(SettingsStates.waiting_for_footer, F.text)
    async def process_footer(message: Message, state: FSMContext):
    data = await state.get_data()
    channel_id = data.get("selected_channel_id")
    if not channel_id:
    await message.answer(" Ошибка: Канал не выбран. Начните заново.", reply_markup=get_main_menu_keyboard())
    await state.clear()
    return

    footer = message.html_text
    save_footer(channel_id, foot
    ⁡token.py

    Python
    TOKEN = ""

    [IMG]
     
    20 апр 2025 Изменено
Top