Скриншот/видео недочета: Полная ссылка на страницу, где возникает проблема: lolz.live Как воспроизвести недочет: апи дает сообщение о успеxе отправки, а по факту сообщения не отправляется есть 2 кода. которые работали идеально до вчерашнего дня, но перестали по непонятной причине чатгпт import sys import requests import time import re from time import sleep class Lolz: USERNAME = "" BLACK_LIST = [] LIKES_LIM = 0 MESSAGES_LIM = 0 TOKEN = "" def __init__(self): self.ses = requests.Session() self.ses.headers['Authorization'] = 'Bearer ' + self.TOKEN try: self.ses.post('https://api.lolz.live/', timeout=10) sleep(.5) except Exception as e: print(f"Ошибка при подключении: {e}") def get_conversations(self): return self.ses.get('https://prod-api.zelenka.guru/conversations', timeout=10).json() def get_thread_info(self, tid): return self.ses.get(f'https://prod-api.zelenka.guru/threads/{tid}', timeout=10).json() def get_thread(self, m): thread_id = m['message_body'] match = re.search(r"threads/(\d+)", thread_id) thread_info = self.get_thread_info(match.group(1))['thread'] sleep(.5) creator = thread_info['creator_username'] if creator != Lolz.USERNAME and creator not in Lolz.BLACK_LIST: return None text = thread_info['first_post']['post_body_html'] if 'quoteContainer hideContainer' not in text: return None text = ''.join( clean(text.split('<blockquote class="quoteContainer hideContainer">')[-1].split( '</blockquote>')[0]).split( 'Нажмите, чтобы раскрыть...')) return text def get_user(self, link): return self.ses.get(link, timeout=10).json() def send_msg(self, user, msg): data = {'conversation_id': user, 'message_body': msg} res = self.ses.post(f'https://prod-api.zelenka.guru/conversation-messages', data=data, timeout=10) return res def get_all_msg(self, perm): return self.ses.get(perm['links']['messages'], timeout=10).json() def clean(raw_html): first_str = re.compile('<.*?>') cleaned_text = re.sub(first_str, '', raw_html) return cleaned_text def pool(): conversations = lol.get_conversations()['conversations'] sleep(.5) for elem in conversations: messages = lol.get_all_msg(elem) sleep(.5) to_do = [] for elem1 in messages['messages'][:5]: if elem1['creator_username'] == Lolz.USERNAME: break to_do.append(elem1) to_do.reverse() for elem1 in to_do: if re.search(r'threads/(\d+)', elem1['message_body']): try: user = lol.get_user(elem['first_message']['links']['creator']) sleep(.5) user_data = user.get('user', {}) # Проверка по количеству лайков if user_data.get('user_like_count', 100) < Lolz.LIKES_LIM: continue # Проверка по количеству сообщений (новая проверка) if user_data.get('message_count', 100) < Lolz.MESSAGES_LIM: continue msg = lol.get_thread(elem1) sleep(.5) if msg is None: continue # Отправка основного сообщения print(msg, user_data.get('username', "hidden"), lol.send_msg(elem['conversation_id'], msg)) time.sleep(10) # Лимит API # Отправка дополнительного сообщения thanks_msg = "Спасибо, что зашел ко мне в тему. Отпиши в тему, что получил личный и обязательно свое мнение/впечатление о теме (необязательно положительное)" print(lol.send_msg(elem['conversation_id'], thanks_msg)) time.sleep(10) # Лимит API except Exception as e: print(e) else: message_id = str(elem1['message_id']) try: with open("log_other_msg.txt", "r", encoding="utf-8") as f: old_data = f.read() except FileNotFoundError: old_data = "" if message_id in old_data: continue new_entry = ( f"Username: {elem1['creator_username']}\n" f"Message: {elem1['message_body']}\n" f"message_id: {message_id}\n" "________\n") with open("log_other_msg.txt", "w", encoding="utf-8") as f: f.write(new_entry + old_data) time.sleep(.5) print("Сплю 120 сек") sleep(120) if __name__=='__main__': lol = Lolz() while True: try: pool() except Exception as e: print(e) time.sleep(10) Python import sys import requests import time import re from time import sleep class Lolz: USERNAME = "" BLACK_LIST = [] LIKES_LIM = 0 MESSAGES_LIM = 0 TOKEN = "" def __init__(self): self.ses = requests.Session() self.ses.headers['Authorization'] = 'Bearer ' + self.TOKEN try: self.ses.post('https://api.lolz.live/', timeout=10) sleep(.5) except Exception as e: print(f"Ошибка при подключении: {e}") def get_conversations(self): return self.ses.get('https://prod-api.zelenka.guru/conversations', timeout=10).json() def get_thread_info(self, tid): return self.ses.get(f'https://prod-api.zelenka.guru/threads/{tid}', timeout=10).json() def get_thread(self, m): thread_id = m['message_body'] match = re.search(r"threads/(\d+)", thread_id) thread_info = self.get_thread_info(match.group(1))['thread'] sleep(.5) creator = thread_info['creator_username'] if creator != Lolz.USERNAME and creator not in Lolz.BLACK_LIST: return None text = thread_info['first_post']['post_body_html'] if 'quoteContainer hideContainer' not in text: return None text = ''.join( clean(text.split('<blockquote class="quoteContainer hideContainer">')[-1].split( '</blockquote>')[0]).split( 'Нажмите, чтобы раскрыть...')) return text def get_user(self, link): return self.ses.get(link, timeout=10).json() def send_msg(self, user, msg): data = {'conversation_id': user, 'message_body': msg} res = self.ses.post(f'https://prod-api.zelenka.guru/conversation-messages', data=data, timeout=10) return res def get_all_msg(self, perm): return self.ses.get(perm['links']['messages'], timeout=10).json() def clean(raw_html): first_str = re.compile('<.*?>') cleaned_text = re.sub(first_str, '', raw_html) return cleaned_text def pool(): conversations = lol.get_conversations()['conversations'] sleep(.5) for elem in conversations: messages = lol.get_all_msg(elem) sleep(.5) to_do = [] for elem1 in messages['messages'][:5]: if elem1['creator_username'] == Lolz.USERNAME: break to_do.append(elem1) to_do.reverse() for elem1 in to_do: if re.search(r'threads/(\d+)', elem1['message_body']): try: user = lol.get_user(elem['first_message']['links']['creator']) sleep(.5) user_data = user.get('user', {}) # Проверка по количеству лайков if user_data.get('user_like_count', 100) < Lolz.LIKES_LIM: continue # Проверка по количеству сообщений (новая проверка) if user_data.get('message_count', 100) < Lolz.MESSAGES_LIM: continue msg = lol.get_thread(elem1) sleep(.5) if msg is None: continue # Отправка основного сообщения print(msg, user_data.get('username', "hidden"), lol.send_msg(elem['conversation_id'], msg)) time.sleep(10) # Лимит API # Отправка дополнительного сообщения thanks_msg = "Спасибо, что зашел ко мне в тему. Отпиши в тему, что получил личный и обязательно свое мнение/впечатление о теме (необязательно положительное)" print(lol.send_msg(elem['conversation_id'], thanks_msg)) time.sleep(10) # Лимит API except Exception as e: print(e) else: message_id = str(elem1['message_id']) try: with open("log_other_msg.txt", "r", encoding="utf-8") as f: old_data = f.read() except FileNotFoundError: old_data = "" if message_id in old_data: continue new_entry = ( f"Username: {elem1['creator_username']}\n" f"Message: {elem1['message_body']}\n" f"message_id: {message_id}\n" "________\n") with open("log_other_msg.txt", "w", encoding="utf-8") as f: f.write(new_entry + old_data) time.sleep(.5) print("Сплю 120 сек") sleep(120) if __name__=='__main__': lol = Lolz() while True: try: pool() except Exception as e: print(e) time.sleep(10) гемини (новая версия скрипта) import asyncio import logging import re import sys from time import sleep # --- Настройка логирования в файл --- logger = logging.getLogger() logger.setLevel(logging.INFO) for handler in logger.handlers[:]: logger.removeHandler(handler) file_handler = logging.FileHandler('log.txt', mode='w', encoding='utf-8') formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) logger.addHandler(file_handler) # --- Конец настройки --- try: from LOLZTEAM.Client import Forum except ImportError: logging.error("Библиотека LOLZTEAM не установлена. Выполните: pip install LOLZTEAM") sys.exit() class Lolz: USERNAME = "Победилдо" BLACK_LIST = [] LIKES_LIM = 0 MESSAGES_LIM = 0 # Ваш токен TOKEN = "" def __init__(self): self.forum = Forum(token=self.TOKEN) self.forum.settings.logger.disable() self.forum.settings.delay.enable() logging.info("Клиент Lolz инициализирован.") async def send_msg(self, conversation_id, msg): """ ИСПРАВЛЕНО: Отправляет сообщение через универсальный метод forum.request с правильным параметром 'message_body' вместо 'message'. """ logging.info(f"Отправляю сообщение в беседу #{conversation_id}. Содержимое: '{msg[:75].replace(chr(10), ' ')}...'") # Формируем данные с правильным ключом 'message_body' payload = { 'conversation_id': conversation_id, 'message_body': msg } # Используем универсальный метод request, передавая данные как form-data response = await self.forum.request("POST", "/conversation-messages", data=payload) logging.info(f"Ответ от API: Статус {response.status_code}") try: response_json = response.json() logging.info(f"Ответ от API (JSON): {response_json}") # Проверяем, что в ответе теперь есть созданное сообщение if "message" in response_json: logging.info(f"Сообщение #{response_json['message']['message_id']} успешно создано.") elif "error" in response_json or "errors" in response_json: error_message = response_json.get("error") or response_json.get("errors") logging.error(f"!!! API ВЕРНУЛО ОШИБКУ: {error_message} !!!") else: logging.warning("API не вернуло ни сообщение, ни ошибку. Ответ был странным.") except Exception: logging.error(f"Не удалось прочитать JSON из ответа API. Тело ответа: {response.text}") return response async def get_all_msg(self, conversation_id): logging.info(f"Получаю сообщения из беседы #{conversation_id}") response = await self.forum.request("GET", f"/conversations/{conversation_id}/messages") return response.json() async def get_conversations(self): logging.info("Получаю список бесед...") response = await self.forum.conversations.list() if response.status_code == 200: logging.info(f"Список бесед успешно получен. Найдено {len(response.json().get('conversations', []))} бесед.") else: logging.error(f"Ошибка при получении бесед: {response.status_code} - {response.text}") return response.json() async def get_thread_info(self, tid): logging.info(f"Получаю информацию о теме #{tid}") response = await self.forum.threads.get(thread_id=tid) return response.json() async def get_thread(self, m): thread_id_match = re.search(r"threads/(\d+)", m['message_body']) if not thread_id_match: return None thread_id = int(thread_id_match.group(1)) thread_info_response = await self.get_thread_info(thread_id) thread_info = thread_info_response.get('thread') if not thread_info: logging.warning(f"Не удалось получить информацию для темы #{thread_id}") return None creator = thread_info.get('creator_username') if creator != self.USERNAME and creator in self.BLACK_LIST: logging.info(f"Создатель темы {creator} в черном списке. Пропускаю.") return None text = thread_info.get('first_post', {}).get('post_body_html', '') if 'quoteContainer hideContainer' in text: text_inside_quote = text.split('<blockquote class="quoteContainer hideContainer">')[-1].split('</blockquote>')[0] cleaned_text = clean(text_inside_quote).replace('Нажмите, чтобы раскрыть...', '').strip() if cleaned_text: return cleaned_text else: logging.warning(f"Извлеченный скрытый контент из темы #{thread_id} оказался пустым.") return None logging.info(f"В теме #{thread_id} не найден скрытый контент (цитата).") return None async def get_user(self, user_id): logging.info(f"Получаю информацию о пользователе #{user_id}") response = await self.forum.users.get(user_id=user_id) return response.json() def clean(raw_html): first_str = re.compile('<.*?>') cleaned_text = re.sub(first_str, '', raw_html) return cleaned_text async def pool(lol): try: conversations_response = await lol.get_conversations() conversations = conversations_response.get('conversations', []) if not conversations: logging.info("Не найдено активных бесед для проверки.") return await asyncio.sleep(0.5) for elem in conversations: conv_id = elem['conversation_id'] logging.info(f"--- Обрабатываю беседу #{conv_id} с {elem.get('creator_username')} ---") last_message = elem.get("last_message", {}) if last_message.get("creator_username") == Lolz.USERNAME: logging.info(f"Последнее сообщение в беседе #{conv_id} уже от меня. Пропускаю.") continue messages_response = await lol.get_all_msg(conv_id) messages = messages_response.get('messages', []) if not messages: continue await asyncio.sleep(0.5) msg_to_process = None for msg in messages: if msg.get('creator_username') != Lolz.USERNAME: msg_to_process = msg break if not msg_to_process: logging.info(f"В беседе #{conv_id} не найдено новых сообщений от других.") continue logging.info(f"Найдено новое сообщение #{msg_to_process.get('message_id')} для обработки.") if re.search(r'threads/(\d+)', msg_to_process.get('message_body', '')): user_info_response = await lol.get_user(elem['creator_user_id']) user_data = user_info_response.get('user', {}) await asyncio.sleep(0.5) if user_data.get('user_like_count', 100) < Lolz.LIKES_LIM or user_data.get('message_count', 100) < Lolz.MESSAGES_LIM: logging.info(f"Пользователь {user_data.get('username')} пропущен по лимитам.") continue reply_msg = await lol.get_thread(msg_to_process) await asyncio.sleep(0.5) if reply_msg is None: continue await lol.send_msg(conv_id, reply_msg) await asyncio.sleep(10) thanks_msg = "Спасибо, что зашел ко мне в тему. Отпиши в тему, что получил личный и обязательно свое мнение/впечатление о теме (необязательно положительное)" await lol.send_msg(conv_id, thanks_msg) await asyncio.sleep(10) else: logging.info(f"В сообщении #{msg_to_process.get('message_id')} не найдена ссылка на тему.") except Exception as e: logging.critical(f"Произошла критическая ошибка в функции pool: {e}", exc_info=True) async def main(): lol = Lolz() while True: logging.info("--- Начало нового цикла проверки ---") await pool(lol) logging.info(f"--- Цикл завершен, ожидаю 120 секунд ---") await asyncio.sleep(120) if __name__ == '__main__': if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Скрипт остановлен вручную.") sys.exit(0) Python import asyncio import logging import re import sys from time import sleep # --- Настройка логирования в файл --- logger = logging.getLogger() logger.setLevel(logging.INFO) for handler in logger.handlers[:]: logger.removeHandler(handler) file_handler = logging.FileHandler('log.txt', mode='w', encoding='utf-8') formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) logger.addHandler(file_handler) # --- Конец настройки --- try: from LOLZTEAM.Client import Forum except ImportError: logging.error("Библиотека LOLZTEAM не установлена. Выполните: pip install LOLZTEAM") sys.exit() class Lolz: USERNAME = "Победилдо" BLACK_LIST = [] LIKES_LIM = 0 MESSAGES_LIM = 0 # Ваш токен TOKEN = "" def __init__(self): self.forum = Forum(token=self.TOKEN) self.forum.settings.logger.disable() self.forum.settings.delay.enable() logging.info("Клиент Lolz инициализирован.") async def send_msg(self, conversation_id, msg): """ ИСПРАВЛЕНО: Отправляет сообщение через универсальный метод forum.request с правильным параметром 'message_body' вместо 'message'. """ logging.info(f"Отправляю сообщение в беседу #{conversation_id}. Содержимое: '{msg[:75].replace(chr(10), ' ')}...'") # Формируем данные с правильным ключом 'message_body' payload = { 'conversation_id': conversation_id, 'message_body': msg } # Используем универсальный метод request, передавая данные как form-data response = await self.forum.request("POST", "/conversation-messages", data=payload) logging.info(f"Ответ от API: Статус {response.status_code}") try: response_json = response.json() logging.info(f"Ответ от API (JSON): {response_json}") # Проверяем, что в ответе теперь есть созданное сообщение if "message" in response_json: logging.info(f"Сообщение #{response_json['message']['message_id']} успешно создано.") elif "error" in response_json or "errors" in response_json: error_message = response_json.get("error") or response_json.get("errors") logging.error(f"!!! API ВЕРНУЛО ОШИБКУ: {error_message} !!!") else: logging.warning("API не вернуло ни сообщение, ни ошибку. Ответ был странным.") except Exception: logging.error(f"Не удалось прочитать JSON из ответа API. Тело ответа: {response.text}") return response async def get_all_msg(self, conversation_id): logging.info(f"Получаю сообщения из беседы #{conversation_id}") response = await self.forum.request("GET", f"/conversations/{conversation_id}/messages") return response.json() async def get_conversations(self): logging.info("Получаю список бесед...") response = await self.forum.conversations.list() if response.status_code == 200: logging.info(f"Список бесед успешно получен. Найдено {len(response.json().get('conversations', []))} бесед.") else: logging.error(f"Ошибка при получении бесед: {response.status_code} - {response.text}") return response.json() async def get_thread_info(self, tid): logging.info(f"Получаю информацию о теме #{tid}") response = await self.forum.threads.get(thread_id=tid) return response.json() async def get_thread(self, m): thread_id_match = re.search(r"threads/(\d+)", m['message_body']) if not thread_id_match: return None thread_id = int(thread_id_match.group(1)) thread_info_response = await self.get_thread_info(thread_id) thread_info = thread_info_response.get('thread') if not thread_info: logging.warning(f"Не удалось получить информацию для темы #{thread_id}") return None creator = thread_info.get('creator_username') if creator != self.USERNAME and creator in self.BLACK_LIST: logging.info(f"Создатель темы {creator} в черном списке. Пропускаю.") return None text = thread_info.get('first_post', {}).get('post_body_html', '') if 'quoteContainer hideContainer' in text: text_inside_quote = text.split('<blockquote class="quoteContainer hideContainer">')[-1].split('</blockquote>')[0] cleaned_text = clean(text_inside_quote).replace('Нажмите, чтобы раскрыть...', '').strip() if cleaned_text: return cleaned_text else: logging.warning(f"Извлеченный скрытый контент из темы #{thread_id} оказался пустым.") return None logging.info(f"В теме #{thread_id} не найден скрытый контент (цитата).") return None async def get_user(self, user_id): logging.info(f"Получаю информацию о пользователе #{user_id}") response = await self.forum.users.get(user_id=user_id) return response.json() def clean(raw_html): first_str = re.compile('<.*?>') cleaned_text = re.sub(first_str, '', raw_html) return cleaned_text async def pool(lol): try: conversations_response = await lol.get_conversations() conversations = conversations_response.get('conversations', []) if not conversations: logging.info("Не найдено активных бесед для проверки.") return await asyncio.sleep(0.5) for elem in conversations: conv_id = elem['conversation_id'] logging.info(f"--- Обрабатываю беседу #{conv_id} с {elem.get('creator_username')} ---") last_message = elem.get("last_message", {}) if last_message.get("creator_username") == Lolz.USERNAME: logging.info(f"Последнее сообщение в беседе #{conv_id} уже от меня. Пропускаю.") continue messages_response = await lol.get_all_msg(conv_id) messages = messages_response.get('messages', []) if not messages: continue await asyncio.sleep(0.5) msg_to_process = None for msg in messages: if msg.get('creator_username') != Lolz.USERNAME: msg_to_process = msg break if not msg_to_process: logging.info(f"В беседе #{conv_id} не найдено новых сообщений от других.") continue logging.info(f"Найдено новое сообщение #{msg_to_process.get('message_id')} для обработки.") if re.search(r'threads/(\d+)', msg_to_process.get('message_body', '')): user_info_response = await lol.get_user(elem['creator_user_id']) user_data = user_info_response.get('user', {}) await asyncio.sleep(0.5) if user_data.get('user_like_count', 100) < Lolz.LIKES_LIM or user_data.get('message_count', 100) < Lolz.MESSAGES_LIM: logging.info(f"Пользователь {user_data.get('username')} пропущен по лимитам.") continue reply_msg = await lol.get_thread(msg_to_process) await asyncio.sleep(0.5) if reply_msg is None: continue await lol.send_msg(conv_id, reply_msg) await asyncio.sleep(10) thanks_msg = "Спасибо, что зашел ко мне в тему. Отпиши в тему, что получил личный и обязательно свое мнение/впечатление о теме (необязательно положительное)" await lol.send_msg(conv_id, thanks_msg) await asyncio.sleep(10) else: logging.info(f"В сообщении #{msg_to_process.get('message_id')} не найдена ссылка на тему.") except Exception as e: logging.critical(f"Произошла критическая ошибка в функции pool: {e}", exc_info=True) async def main(): lol = Lolz() while True: logging.info("--- Начало нового цикла проверки ---") await pool(lol) logging.info(f"--- Цикл завершен, ожидаю 120 секунд ---") await asyncio.sleep(120) if __name__ == '__main__': if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) try: asyncio.run(main()) except KeyboardInterrupt: logging.info("Скрипт остановлен вручную.") sys.exit(0)
Фича, апи сообщений обновилось, доки обновятся ближе к 8 по мск. Вот что возвращает 200ск, а не 404 на путь которого не существует это уже не Фича Эти юзай /conversations/{id}/messages /conversations/{id}/messages/{msid}