Бот для автоматического поднятия тем Описание проекта Этот код представляет собой Telegram-бота, созданного с использованием библиотеки aiogram, который предназначен для автоматического "поднятия" (bumping) тем на различных платформах через API. Бот поддерживает настройку интервалов поднятия, управление списком тем и обработку ошибок с помощью библиотеки tenacity для повторных попыток при сбоях. Основные возможности Установка API-ключа для авторизации запросов Настройка интервала автоматического поднятия тем Добавление и удаление URL тем для поднятия Отображение списка добавленных тем Обработка ошибок с автоматическими повторными попытками Ограничение доступа только для определённых пользователей Код программы import subprocess import sys import asyncio import json import re REQUIRED_PACKAGES = [ 'aiogram', 'requests', 'tenacity' ] def install_packages(): for package in REQUIRED_PACKAGES: try: __import__(package) except ImportError: print(f"Устанавливается {package}...") subprocess.check_call([sys.executable, "-m", "pip", "install", package]) print(f"{package} успешно установлен") install_packages() from aiogram import Bot, Dispatcher, types from aiogram.filters import Command, BaseFilter from aiogram.types import BotCommand import requests from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type API_TOKEN = '7087310072:AAE-Yw_U6clWoJmW3JgPeX67rjO6OiaYhQk' THREADS_FILE = 'threads.json' API_KEY_FILE = 'api_key.json' INTERVAL_FILE = 'interval.json' ALLOWED_USERS = [8029299947, 1062561626] CHAT_ID = -1002470589466 CHAT_THREAD = 41656 def load_api_key(): try: with open(API_KEY_FILE, 'r') as f: data = json.load(f) return data.get('api_key') except FileNotFoundError: default_data = {"api_key": None} with open(API_KEY_FILE, 'w') as f: json.dump(default_data, f) return None except json.JSONDecodeError: return None def save_api_key(key): with open(API_KEY_FILE, 'w') as f: json.dump({'api_key': key}, f) def load_interval(): try: with open(INTERVAL_FILE, 'r') as f: data = json.load(f) return data.get('interval', 30) except FileNotFoundError: default_data = {"interval": 30} with open(INTERVAL_FILE, 'w') as f: json.dump(default_data, f) return 30 except json.JSONDecodeError: return 30 def save_interval(interval): with open(INTERVAL_FILE, 'w') as f: json.dump({'interval': interval}, f) API_KEY = load_api_key() INTERVAL = load_interval() bot = Bot(token=API_TOKEN) dp = Dispatcher() def load_threads(): try: with open(THREADS_FILE, 'r') as f: return json.load(f) except FileNotFoundError: default_threads = [] with open(THREADS_FILE, 'w') as f: json.dump(default_threads, f) return default_threads except json.JSONDecodeError: return [] def save_threads(threads): with open(THREADS_FILE, 'w') as f: json.dump(threads, f) def extract_wait_time(error_message): match = re.search(r'(\d+ \u0434\u0435\u043d\u044c)?\s*(\d+ \u0447\u0430\u0441\u043e\u0432)?\s*(\d+ \u043c\u0438\u043d\u0443\u0442)?\s*(\d+ \u0441\u0435\u043a\u0443\u043d\u0434)?', error_message) if match: days = match.group(1) if match.group(1) else "0 дней" hours = match.group(2) if match.group(2) else "0 часов" minutes = match.group(3) if match.group(3) else "0 минут" seconds = match.group(4) if match.group(4) else "0 секунд" return f"{days} {hours} {minutes} {seconds}" return "Не удалось извлечь время ожидания" @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(requests.exceptions.ConnectionError)) async def bump_thread(url, chat_id, topic_id=None): if not API_KEY: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text="API_KEY не установлен. Используйте /setkey для установки.") return headers = { "accept": "application/json", "authorization": f"Bearer {API_KEY}" } try: response = requests.post(url, headers=headers, timeout=10) response.raise_for_status() response_json = response.json() if "errors" in response_json: error_message = response_json['errors'][0] wait_time = extract_wait_time(error_message) await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Ошибка при поднятии {url}\nВремя ожидания: {wait_time}") else: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Тема {url} успешно поднята.") except requests.exceptions.ConnectionError as e: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Ошибка подключения к {url}: {str(e)}. Повторные попытки исчерпаны.") raise except requests.exceptions.Timeout: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Тайм-аут при запросе к {url}. Повторные попытки исчерпаны.") except requests.exceptions.RequestException as e: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Ошибка HTTP при поднятии {url}: {str(e)}") except json.JSONDecodeError: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Неожиданный ответ от {url}") except Exception as e: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Неизвестная ошибка при поднятии {url}: {str(e)}") class UserFilter(BaseFilter): def __init__(self, allowed_users: list): self.allowed_users = allowed_users async def __call__(self, message: types.Message) -> bool: return message.from_user.id in self.allowed_users @dp.message(Command(commands=['start'])) async def start_command(message: types.Message): welcome_text = """Привет! Я бот для автоматического поднятия тем. Вот доступные команды: /start - Показать это сообщение /setkey <ключ> - Установить API-ключ /setinterval <минуты> - Установить интервал поднятия /add <url> - Добавить тему для поднятия /remove <url> - Удалить тему /list - Показать список ваших тем""" await message.reply(welcome_text) @dp.message(Command(commands=['setkey']), UserFilter(ALLOWED_USERS)) async def set_api_key(message: types.Message): global API_KEY parts = message.text.split(maxsplit=1) if len(parts) < 2: await message.reply("Пожалуйста, укажите новый API-ключ после команды /setkey.") return new_key = parts[1] API_KEY = new_key save_api_key(new_key) await message.reply("API-ключ успешно обновлен.") @dp.message(Command(commands=['setinterval']), UserFilter(ALLOWED_USERS)) async def set_interval(message: types.Message): global INTERVAL parts = message.text.split(maxsplit=1) if len(parts) < 2: await message.reply("Пожалуйста, укажите число минут после команды /setinterval.") return try: new_interval = int(parts[1]) if new_interval < 1: raise ValueError INTERVAL = new_interval save_interval(new_interval) await message.reply(f"Интервал установлен на {new_interval} минут.") except ValueError: await message.reply("Пожалуйста, укажите корректное число минут (например, /setinterval 45).") @dp.message(Command(commands=['add']), UserFilter(ALLOWED_USERS)) async def add_thread(message: types.Message): parts = message.text.split(maxsplit=1) if len(parts) < 2: await message.reply("Укажите URL темы после команды /add.") return url = parts[1] threads = load_threads() chat_id = message.chat.id if any(thread['url'] == url and thread['chat_id'] == chat_id for thread in threads): await message.reply("Эта тема уже добавлена для вашего чата.") else: threads.append({'url': url, 'chat_id': chat_id, 'topic_id': message.message_thread_id}) save_threads(threads) await message.reply(f"Тема {url} добавлена для вашего чата.") @dp.message(Command(commands=['remove']), UserFilter(ALLOWED_USERS)) async def remove_thread(message: types.Message): parts = message.text.split(maxsplit=1) if len(parts) < 2: await message.reply("Укажите URL темы после команды /remove.") return url = parts[1] threads = load_threads() chat_id = message.chat.id for thread in threads[:]: if thread['url'] == url and thread['chat_id'] == chat_id: threads.remove(thread) save_threads(threads) await message.reply(f"Тема {url} удалена из вашего чата.") return await message.reply("Эта тема не найдена в вашем списке.") @dp.message(Command(commands=['list']), UserFilter(ALLOWED_USERS)) async def list_threads(message: types.Message): threads = load_threads() chat_id = message.chat.id user_threads = [thread['url'] for thread in threads if thread['chat_id'] == chat_id] if user_threads: await message.reply("Ваши темы:\n" + "\n".join(user_threads)) else: await message.reply("У вас нет добавленных тем.") async def bump_threads_periodically(): while True: threads = load_threads() for thread in threads: try: await bump_thread(thread['url'], thread['chat_id'], thread.get('topic_id')) except Exception as e: print(f"Ошибка в цикле поднятия темы {thread['url']}: {str(e)}") await asyncio.sleep(10) print(f"Ожидание {INTERVAL} минут перед следующим циклом...") await asyncio.sleep(INTERVAL * 60) async def set_bot_commands(): commands = [ BotCommand(command="/start", description="Начать работу с ботом"), BotCommand(command="/setkey", description="Установить API-ключ"), BotCommand(command="/setinterval", description="Установить интервал поднятия"), BotCommand(command="/add", description="Добавить тему"), BotCommand(command="/remove", description="Удалить тему"), BotCommand(command="/list", description="Показать список тем") ] await bot.set_my_commands(commands) async def main(): await set_bot_commands() asyncio.create_task(bump_threads_periodically()) await dp.start_polling(bot) if __name__ == '__main__': asyncio.run(main()) Code import subprocess import sys import asyncio import json import re REQUIRED_PACKAGES = [ 'aiogram', 'requests', 'tenacity' ] def install_packages(): for package in REQUIRED_PACKAGES: try: __import__(package) except ImportError: print(f"Устанавливается {package}...") subprocess.check_call([sys.executable, "-m", "pip", "install", package]) print(f"{package} успешно установлен") install_packages() from aiogram import Bot, Dispatcher, types from aiogram.filters import Command, BaseFilter from aiogram.types import BotCommand import requests from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type API_TOKEN = '7087310072:AAE-Yw_U6clWoJmW3JgPeX67rjO6OiaYhQk' THREADS_FILE = 'threads.json' API_KEY_FILE = 'api_key.json' INTERVAL_FILE = 'interval.json' ALLOWED_USERS = [8029299947, 1062561626] CHAT_ID = -1002470589466 CHAT_THREAD = 41656 def load_api_key(): try: with open(API_KEY_FILE, 'r') as f: data = json.load(f) return data.get('api_key') except FileNotFoundError: default_data = {"api_key": None} with open(API_KEY_FILE, 'w') as f: json.dump(default_data, f) return None except json.JSONDecodeError: return None def save_api_key(key): with open(API_KEY_FILE, 'w') as f: json.dump({'api_key': key}, f) def load_interval(): try: with open(INTERVAL_FILE, 'r') as f: data = json.load(f) return data.get('interval', 30) except FileNotFoundError: default_data = {"interval": 30} with open(INTERVAL_FILE, 'w') as f: json.dump(default_data, f) return 30 except json.JSONDecodeError: return 30 def save_interval(interval): with open(INTERVAL_FILE, 'w') as f: json.dump({'interval': interval}, f) API_KEY = load_api_key() INTERVAL = load_interval() bot = Bot(token=API_TOKEN) dp = Dispatcher() def load_threads(): try: with open(THREADS_FILE, 'r') as f: return json.load(f) except FileNotFoundError: default_threads = [] with open(THREADS_FILE, 'w') as f: json.dump(default_threads, f) return default_threads except json.JSONDecodeError: return [] def save_threads(threads): with open(THREADS_FILE, 'w') as f: json.dump(threads, f) def extract_wait_time(error_message): match = re.search(r'(\d+ \u0434\u0435\u043d\u044c)?\s*(\d+ \u0447\u0430\u0441\u043e\u0432)?\s*(\d+ \u043c\u0438\u043d\u0443\u0442)?\s*(\d+ \u0441\u0435\u043a\u0443\u043d\u0434)?', error_message) if match: days = match.group(1) if match.group(1) else "0 дней" hours = match.group(2) if match.group(2) else "0 часов" minutes = match.group(3) if match.group(3) else "0 минут" seconds = match.group(4) if match.group(4) else "0 секунд" return f"{days} {hours} {minutes} {seconds}" return "Не удалось извлечь время ожидания" @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(requests.exceptions.ConnectionError)) async def bump_thread(url, chat_id, topic_id=None): if not API_KEY: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text="API_KEY не установлен. Используйте /setkey для установки.") return headers = { "accept": "application/json", "authorization": f"Bearer {API_KEY}" } try: response = requests.post(url, headers=headers, timeout=10) response.raise_for_status() response_json = response.json() if "errors" in response_json: error_message = response_json['errors'][0] wait_time = extract_wait_time(error_message) await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Ошибка при поднятии {url}\nВремя ожидания: {wait_time}") else: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Тема {url} успешно поднята.") except requests.exceptions.ConnectionError as e: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Ошибка подключения к {url}: {str(e)}. Повторные попытки исчерпаны.") raise except requests.exceptions.Timeout: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Тайм-аут при запросе к {url}. Повторные попытки исчерпаны.") except requests.exceptions.RequestException as e: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Ошибка HTTP при поднятии {url}: {str(e)}") except json.JSONDecodeError: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Неожиданный ответ от {url}") except Exception as e: await bot.send_message(chat_id=CHAT_ID, message_thread_id=CHAT_THREAD, text=f"Неизвестная ошибка при поднятии {url}: {str(e)}") class UserFilter(BaseFilter): def __init__(self, allowed_users: list): self.allowed_users = allowed_users async def __call__(self, message: types.Message) -> bool: return message.from_user.id in self.allowed_users @dp.message(Command(commands=['start'])) async def start_command(message: types.Message): welcome_text = """Привет! Я бот для автоматического поднятия тем. Вот доступные команды: /start - Показать это сообщение /setkey <ключ> - Установить API-ключ /setinterval <минуты> - Установить интервал поднятия /add <url> - Добавить тему для поднятия /remove <url> - Удалить тему /list - Показать список ваших тем""" await message.reply(welcome_text) @dp.message(Command(commands=['setkey']), UserFilter(ALLOWED_USERS)) async def set_api_key(message: types.Message): global API_KEY parts = message.text.split(maxsplit=1) if len(parts) < 2: await message.reply("Пожалуйста, укажите новый API-ключ после команды /setkey.") return new_key = parts[1] API_KEY = new_key save_api_key(new_key) await message.reply("API-ключ успешно обновлен.") @dp.message(Command(commands=['setinterval']), UserFilter(ALLOWED_USERS)) async def set_interval(message: types.Message): global INTERVAL parts = message.text.split(maxsplit=1) if len(parts) < 2: await message.reply("Пожалуйста, укажите число минут после команды /setinterval.") return try: new_interval = int(parts[1]) if new_interval < 1: raise ValueError INTERVAL = new_interval save_interval(new_interval) await message.reply(f"Интервал установлен на {new_interval} минут.") except ValueError: await message.reply("Пожалуйста, укажите корректное число минут (например, /setinterval 45).") @dp.message(Command(commands=['add']), UserFilter(ALLOWED_USERS)) async def add_thread(message: types.Message): parts = message.text.split(maxsplit=1) if len(parts) < 2: await message.reply("Укажите URL темы после команды /add.") return url = parts[1] threads = load_threads() chat_id = message.chat.id if any(thread['url'] == url and thread['chat_id'] == chat_id for thread in threads): await message.reply("Эта тема уже добавлена для вашего чата.") else: threads.append({'url': url, 'chat_id': chat_id, 'topic_id': message.message_thread_id}) save_threads(threads) await message.reply(f"Тема {url} добавлена для вашего чата.") @dp.message(Command(commands=['remove']), UserFilter(ALLOWED_USERS)) async def remove_thread(message: types.Message): parts = message.text.split(maxsplit=1) if len(parts) < 2: await message.reply("Укажите URL темы после команды /remove.") return url = parts[1] threads = load_threads() chat_id = message.chat.id for thread in threads[:]: if thread['url'] == url and thread['chat_id'] == chat_id: threads.remove(thread) save_threads(threads) await message.reply(f"Тема {url} удалена из вашего чата.") return await message.reply("Эта тема не найдена в вашем списке.") @dp.message(Command(commands=['list']), UserFilter(ALLOWED_USERS)) async def list_threads(message: types.Message): threads = load_threads() chat_id = message.chat.id user_threads = [thread['url'] for thread in threads if thread['chat_id'] == chat_id] if user_threads: await message.reply("Ваши темы:\n" + "\n".join(user_threads)) else: await message.reply("У вас нет добавленных тем.") async def bump_threads_periodically(): while True: threads = load_threads() for thread in threads: try: await bump_thread(thread['url'], thread['chat_id'], thread.get('topic_id')) except Exception as e: print(f"Ошибка в цикле поднятия темы {thread['url']}: {str(e)}") await asyncio.sleep(10) print(f"Ожидание {INTERVAL} минут перед следующим циклом...") await asyncio.sleep(INTERVAL * 60) async def set_bot_commands(): commands = [ BotCommand(command="/start", description="Начать работу с ботом"), BotCommand(command="/setkey", description="Установить API-ключ"), BotCommand(command="/setinterval", description="Установить интервал поднятия"), BotCommand(command="/add", description="Добавить тему"), BotCommand(command="/remove", description="Удалить тему"), BotCommand(command="/list", description="Показать список тем") ] await bot.set_my_commands(commands) async def main(): await set_bot_commands() asyncio.create_task(bump_threads_periodically()) await dp.start_polling(bot) if __name__ == '__main__': asyncio.run(main())
K1p1k, гений, про какие повторные модули ты? Первая это проверка на наличие данных модулей, если нет, то установка, остальные обычные импорты, хуйню высрал ты