Думаю не у одного меня архив был забит разной рекламой когда для доступа к боту просят подписаться на канал, перейти в ебаного бота или ещё какая-то хуйня Короче кратко, теперь есть скрипт который удалит всё нахуй из архива За то что могут снести аккаунт, ответственность не несу. У меня не сносило. Со своих аккаунтов вышел с более чем 155 каналов только ЕСЛИ ЕСТЬ ПРЕДЛОЖЕНИЯ ПО УЛУЧШЕНИЮ, ПИШИТЕ Превью Для работы надо: * Закинуть сессию в папку со скриптом (можно не одну) --> Или если сессий неету сркипт предложит создать её * Выбрать чё-то Тут выбрать Code: import sys import os import glob import asyncio import concurrent.futures import signal from typing import Dict, Callable, List, Optional, Set, Tuple from telethon.sync import TelegramClient from telethon.tl.types import User, Channel, Chat, InputPeerUser, Dialog, DialogFilter from telethon.tl.functions.channels import LeaveChannelRequest from telethon.tl.functions.messages import DeleteHistoryRequest from telethon.tl.functions.contacts import BlockRequest from telethon.errors import SessionPasswordNeededError, FloodWaitError from colorama import init, Fore, Style import time from collections import defaultdict import platform import random init(autoreset=True) ANDROID_DEVICES = [ ("SM-S918B", "Samsung Galaxy S23 Ultra", "13", "TQ3C.230901.001.B1"), ("SM-X916B", "Samsung Galaxy Tab S9 Ultra", "13", "TQ3C.230901.001.B1"), ("SM-F946B", "Samsung Galaxy Z Fold5", "13", "TQ3C.230901.001.B1"), ("M2102K1G", "Xiaomi 12 Pro", "13", "TQ3C.230901.001.B1"), ("2201122G", "Xiaomi 12", "13", "TQ3C.230901.001.B1"), ("CPH2305", "OPPO Find X5 Pro", "13", "TQ3C.230901.001.B1"), ("V2145A", "vivo X80 Pro", "13", "TQ3C.230901.001.B1"), ("LE2121", "OnePlus 9 Pro", "13", "TQ3C.230901.001.B1"), ("ASUS_I005DA", "ASUS ROG Phone 6", "13", "TQ3C.230901.001.B1") ] def get_random_device(): device = random.choice(ANDROID_DEVICES) sdk_version = 33 build_number = device[3] return { "device_model": device[1], "system_version": f"Android {device[2]} (SDK {sdk_version}; {build_number})", "app_version": "9.7.3", "lang_code": "ru", "system_lang_code": "ru" } class TelegramCleaner: def __init__(self, session_name: str) -> None: self.api_id: int = 2040 self.api_hash: str = "b18441a1ff607e10a989891a5462e627" self.session: str = session_name self.client: Optional[TelegramClient] = None self.bot_usernames: Set[str] = set() self.max_retries: int = 3 self.retry_delay: float = 1.0 self.batch_size: int = 10 self.concurrent_tasks: int = 5 self.device_info = get_random_device() def cleanup_session_file(self) -> None: session_file = f"{self.session}.session" try: if os.path.exists(session_file): os.remove(session_file) except Exception: time.sleep(1) try: if os.path.exists(session_file): os.remove(session_file) except Exception as e: print(f"{Fore.RED}[-] Не удалось удалить файл сессии: {e}{Style.RESET_ALL}") async def create_session(self) -> bool: try: if os.path.exists(f"{self.session}.session"): self.cleanup_session_file() self.client = TelegramClient( self.session, self.api_id, self.api_hash, device_model=self.device_info["device_model"], system_version=self.device_info["system_version"], app_version=self.device_info["app_version"], lang_code=self.device_info["lang_code"], system_lang_code=self.device_info["system_lang_code"] ) await self.client.connect() if not await self.client.is_user_authorized(): print(f"{Fore.YELLOW}[i] Создание новой сессии...{Style.RESET_ALL}") while True: try: phone = input(f"{Fore.GREEN}Введите номер телефона (+7XXXXXXXXXX): {Style.RESET_ALL}") await self.client.send_code_request(phone) verification_code = input(f"{Fore.GREEN}Введите код подтверждения: {Style.RESET_ALL}") try: await self.client.sign_in(phone, verification_code) except SessionPasswordNeededError: password = input(f"{Fore.YELLOW}Введите пароль двухфакторной аутентификации: {Style.RESET_ALL}") await self.client.sign_in(password=password) print(f"{Fore.GREEN}[+] Сессия успешно создана{Style.RESET_ALL}") return True except Exception as e: print(f"{Fore.RED}[-] Ошибка при создании сессии: {e}{Style.RESET_ALL}") retry = input(f"{Fore.YELLOW}Хотите попробовать снова? (y/n): {Style.RESET_ALL}").lower() if retry != 'y': self.cleanup_session_file() return False return True except Exception as e: print(f"{Fore.RED}[-] Ошибка подключения: {e}{Style.RESET_ALL}") self.cleanup_session_file() return False finally: if self.client and self.client.is_connected(): await self.client.disconnect() def is_bot(self, entity: User) -> bool: if not isinstance(entity, User): return False if getattr(entity, 'bot', False): return True if entity.username: username = entity.username.lower() if username.endswith('bot') or username.startswith('bot'): return True if entity.username and not entity.phone: if entity.username in self.bot_usernames: return True return False async def connect(self) -> bool: try: self.client = TelegramClient( self.session, self.api_id, self.api_hash, device_model=self.device_info["device_model"], system_version=self.device_info["system_version"], app_version=self.device_info["app_version"], lang_code=self.device_info["lang_code"], system_lang_code=self.device_info["system_lang_code"] ) await self.client.connect() if not await self.client.is_user_authorized(): self._print_error(f"Сессия недействительна: {self.session}") return False return True except Exception as e: self._print_error(f"Ошибка подключения {self.session}: {e}") return False def _print_success(self, message: str) -> None: print(f"{Fore.GREEN}[+] [{self.session}] {message}{Style.RESET_ALL}") def _print_error(self, message: str) -> None: print(f"{Fore.RED}[-] [{self.session}] {message}{Style.RESET_ALL}") def _print_info(self, message: str) -> None: print(f"{Fore.YELLOW}[i] [{self.session}] {message}{Style.RESET_ALL}") async def verify_action(self, entity_id: int, action_type: str) -> bool: try: dialog = await self.client.get_entity(entity_id) archived_dialogs = await self.get_archived_dialogs() return not any(d.entity.id == entity_id for d in archived_dialogs) except Exception: return True async def block_user(self, user: User) -> bool: for attempt in range(self.max_retries): try: await self.client(BlockRequest(id=user.id)) await asyncio.sleep(self.retry_delay) if await self.verify_action(user.id, "block"): return True except Exception as e: if attempt == self.max_retries - 1: self._print_error(f"Ошибка при блокировке {user.first_name}: {e}") await asyncio.sleep(self.retry_delay) return False async def delete_dialog_safe(self, entity, name: str) -> bool: for attempt in range(self.max_retries): try: await self.client.delete_dialog(entity) await asyncio.sleep(self.retry_delay) return True except Exception as e: if "not a member" in str(e).lower(): return True if attempt == self.max_retries - 1: self._print_error(f"Ошибка при удалении диалога {name}: {e}") await asyncio.sleep(self.retry_delay) return False async def get_archived_dialogs(self) -> List[Dialog]: archived = [] try: async for dialog in self.client.iter_dialogs(folder=1): archived.append(dialog) await asyncio.sleep(0.1) except Exception as e: self._print_error(f"Ошибка при получении архивных диалогов: {e}") return archived def get_entity_type_name(self, entity) -> str: if isinstance(entity, Channel): return "канал" if entity.broadcast else "группа" elif isinstance(entity, Chat): return "чат" elif isinstance(entity, User): return "бот" if self.is_bot(entity) else "пользователь" return "диалог" async def process_entity(self, dialog: Dialog, action: str) -> bool: entity_type = self.get_entity_type_name(dialog.entity) try: if action == "leave" and isinstance(dialog.entity, (Channel, Chat)): try: await self.client(LeaveChannelRequest(dialog.entity)) await asyncio.sleep(self.retry_delay) self._print_success(f"Покинут {entity_type}: {dialog.name}") return True except Exception as e: if "not a member" not in str(e).lower(): self._print_error(f"Ошибка при выходе из {dialog.name}: {e}") return False return await self.delete_dialog_safe(dialog.entity, dialog.name) except Exception as e: self._print_error(f"Ошибка при обработке {dialog.name}: {e}") return False async def process_batch(self, batch, action_type: str) -> None: tasks = [] for dialog in batch: if action_type == "leave": task = self.process_entity(dialog, "leave") elif action_type == "delete": task = self.delete_dialog_safe(dialog.entity, dialog.name) elif action_type == "block": task = self.block_user(dialog.entity) tasks.append(task) try: await asyncio.gather(*tasks) except FloodWaitError as e: self._print_error(f"Слишком много запросов, ждем {e.seconds} секунд...") await asyncio.sleep(e.seconds) except Exception as e: self._print_error(f"Ошибка при пакетной обработке: {e}") async def process_dialogs_batch(self, dialogs: List[Dialog], action_type: str) -> None: total = len(dialogs) if total == 0: return self._print_info(f"Обработка {total} диалогов...") for i, dialog in enumerate(dialogs): try: if action_type == "leave": await self.process_entity(dialog, "leave") elif action_type == "delete": await self.delete_dialog_safe(dialog.entity, dialog.name) elif action_type == "block": await self.block_user(dialog.entity) progress = min(100, round((i + 1) / total * 100)) print(f"\r{Fore.YELLOW}[i] Прогресс: {progress}%{Style.RESET_ALL}", end="") except Exception as e: self._print_error(f"Ошибка при обработке {dialog.name}: {e}") await asyncio.sleep(0.5) print() async def delete_all(self) -> None: archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return dialogs_by_type = defaultdict(list) for dialog in archived: if isinstance(dialog.entity, User) and self.is_bot(dialog.entity): dialogs_by_type['bots'].append(dialog) elif isinstance(dialog.entity, Channel): dialogs_by_type['channels'].append(dialog) elif isinstance(dialog.entity, Chat): dialogs_by_type['chats'].append(dialog) elif isinstance(dialog.entity, User): dialogs_by_type['users'].append(dialog) if dialogs_by_type['bots']: self._print_info(f"Блокировка {len(dialogs_by_type['bots'])} ботов...") await self.process_dialogs_batch(dialogs_by_type['bots'], "block") if dialogs_by_type['channels'] or dialogs_by_type['chats']: self._print_info(f"Выход из {len(dialogs_by_type['channels']) + len(dialogs_by_type['chats'])} групп/каналов...") await self.process_dialogs_batch(dialogs_by_type['channels'] + dialogs_by_type['chats'], "leave") self._print_info("Удаление всех диалогов...") await self.process_dialogs_batch(archived, "delete") async def delete_except_users(self) -> None: archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return non_users = [d for d in archived if not isinstance(d.entity, User) or self.is_bot(d.entity)] if non_users: self._print_info(f"Обработка {len(non_users)} диалогов...") await self.process_dialogs_batch(non_users, "leave") await self.process_dialogs_batch(non_users, "delete") async def cache_bots(self) -> None: async for dialog in self.client.iter_dialogs(folder=1): if isinstance(dialog.entity, User) and getattr(dialog.entity, 'bot', False): if dialog.entity.username: self.bot_usernames.add(dialog.entity.username.lower()) async def delete_bots(self) -> None: await self.cache_bots() archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return bots = [d for d in archived if isinstance(d.entity, User) and self.is_bot(d.entity)] if not bots: self._print_info("Боты в архиве не найдены") return self._print_info(f"Найдено {len(bots)} ботов") await self.process_dialogs_batch(bots, "block") await self.process_dialogs_batch(bots, "delete") async def delete_channels(self) -> None: archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return channels = [d for d in archived if isinstance(d.entity, Channel) and d.entity.broadcast] if not channels: self._print_info("Каналы в архиве не найдены") return self._print_info(f"Обработка {len(channels)} каналов...") await self.process_dialogs_batch(channels, "leave") await self.process_dialogs_batch(channels, "delete") async def leave_chats(self) -> None: archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return chats = [d for d in archived if isinstance(d.entity, (Channel, Chat)) and (not isinstance(d.entity, Channel) or not d.entity.broadcast)] if chats: self._print_info(f"Выход из {len(chats)} чатов...") await self.process_dialogs_batch(chats, "leave") await self.process_dialogs_batch(chats, "delete") async def delete_users(self) -> None: archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return users = [d for d in archived if isinstance(d.entity, User) and not self.is_bot(d.entity)] if users: self._print_info(f"Удаление {len(users)} пользователей...") await self.process_dialogs_batch(users, "delete") async def process_session(session_name: str, action_num: str) -> None: cleaner = TelegramCleaner(session_name) try: if not await cleaner.connect(): return actions: Dict[str, Callable] = { "1": cleaner.delete_all, "2": cleaner.delete_except_users, "3": cleaner.delete_bots, "4": cleaner.delete_channels, "5": cleaner.leave_chats, "6": cleaner.delete_users } if action_num in actions: await actions[action_num]() except Exception as e: print(f"{Fore.RED}[-] [{session_name}] Критическая ошибка: {e}{Style.RESET_ALL}") finally: if cleaner.client and cleaner.client.is_connected(): await cleaner.client.disconnect() def display_menu(sessions_count: int) -> None: print(f"\n{Fore.CYAN}[*] Telegram Archive Cleaner [*]{Style.RESET_ALL}") print(f"{Fore.CYAN}[*] Эмуляция Android-устройства [*]{Style.RESET_ALL}") print(f"{Fore.YELLOW}[i] Найдено сессий: {sessions_count}{Style.RESET_ALL}") print(f"{Fore.WHITE}1. Удалить всё из архива") print("2. Удалить всё из архива кроме людей") print("3. Удалить ботов из архива") print("4. Удалить каналы из архива") print("5. Выйти с чатов из архива") print("6. Удалить людей из архива") print(f"0. Выход{Style.RESET_ALL}") def handle_exit(signum, frame): print(f"\n{Fore.YELLOW}[i] Завершение работы...{Style.RESET_ALL}") try: for task in asyncio.all_tasks(): task.cancel() except Exception: pass finally: print(f"{Fore.CYAN}[*] Программа завершена{Style.RESET_ALL}") os._exit(0) def main() -> None: signal.signal(signal.SIGINT, handle_exit) signal.signal(signal.SIGTERM, handle_exit) while True: try: sessions: List[str] = [f.replace(".session", "") for f in glob.glob("*.session")] if not sessions: print(f"{Fore.YELLOW}[i] Сессии не найдены. Создаём новую...{Style.RESET_ALL}") session_name = input(f"{Fore.GREEN}Введите имя для новой сессии: {Style.RESET_ALL}") try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) cleaner = TelegramCleaner(session_name) if loop.run_until_complete(cleaner.create_session()): sessions = [session_name] else: retry = input(f"{Fore.YELLOW}Хотите создать другую сессию? (y/n): {Style.RESET_ALL}").lower() if retry != 'y': return continue except KeyboardInterrupt: handle_exit(None, None) except Exception as e: print(f"{Fore.RED}[-] Произошла ошибка: {e}{Style.RESET_ALL}") retry = input(f"{Fore.YELLOW}Хотите попробовать снова? (y/n): {Style.RESET_ALL}").lower() if retry != 'y': return continue while True: try: display_menu(len(sessions)) choice = input(f"\n{Fore.GREEN}>>> {Style.RESET_ALL}") if choice == "0": print(f"{Fore.CYAN}[*] Программа завершена{Style.RESET_ALL}") return if choice not in ["1", "2", "3", "4", "5", "6"]: print(f"{Fore.RED}[-] Неверный выбор{Style.RESET_ALL}") continue loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) max_concurrent = min(len(sessions), 3) with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) as executor: futures = [ loop.create_task(process_session(session, choice)) for session in sessions ] loop.run_until_complete(asyncio.gather(*futures)) except KeyboardInterrupt: handle_exit(None, None) except Exception as e: print(f"{Fore.RED}[-] Произошла ошибка: {e}{Style.RESET_ALL}") retry = input(f"{Fore.YELLOW}Продолжить работу? (y/n): {Style.RESET_ALL}").lower() if retry != 'y': return except KeyboardInterrupt: handle_exit(None, None) if __name__ == "__main__": try: main() except KeyboardInterrupt: handle_exit(None, None) except Exception as e: print(f"{Fore.RED}[-] Критическая ошибка: {e}{Style.RESET_ALL}") finally: print(f"{Fore.CYAN}[*] Работа завершена{Style.RESET_ALL}") Python import sys import os import glob import asyncio import concurrent.futures import signal from typing import Dict, Callable, List, Optional, Set, Tuple from telethon.sync import TelegramClient from telethon.tl.types import User, Channel, Chat, InputPeerUser, Dialog, DialogFilter from telethon.tl.functions.channels import LeaveChannelRequest from telethon.tl.functions.messages import DeleteHistoryRequest from telethon.tl.functions.contacts import BlockRequest from telethon.errors import SessionPasswordNeededError, FloodWaitError from colorama import init, Fore, Style import time from collections import defaultdict import platform import random init(autoreset=True) ANDROID_DEVICES = [ ("SM-S918B", "Samsung Galaxy S23 Ultra", "13", "TQ3C.230901.001.B1"), ("SM-X916B", "Samsung Galaxy Tab S9 Ultra", "13", "TQ3C.230901.001.B1"), ("SM-F946B", "Samsung Galaxy Z Fold5", "13", "TQ3C.230901.001.B1"), ("M2102K1G", "Xiaomi 12 Pro", "13", "TQ3C.230901.001.B1"), ("2201122G", "Xiaomi 12", "13", "TQ3C.230901.001.B1"), ("CPH2305", "OPPO Find X5 Pro", "13", "TQ3C.230901.001.B1"), ("V2145A", "vivo X80 Pro", "13", "TQ3C.230901.001.B1"), ("LE2121", "OnePlus 9 Pro", "13", "TQ3C.230901.001.B1"), ("ASUS_I005DA", "ASUS ROG Phone 6", "13", "TQ3C.230901.001.B1") ] def get_random_device(): device = random.choice(ANDROID_DEVICES) sdk_version = 33 build_number = device[3] return { "device_model": device[1], "system_version": f"Android {device[2]} (SDK {sdk_version}; {build_number})", "app_version": "9.7.3", "lang_code": "ru", "system_lang_code": "ru" } class TelegramCleaner: def __init__(self, session_name: str) -> None: self.api_id: int = 2040 self.api_hash: str = "b18441a1ff607e10a989891a5462e627" self.session: str = session_name self.client: Optional[TelegramClient] = None self.bot_usernames: Set[str] = set() self.max_retries: int = 3 self.retry_delay: float = 1.0 self.batch_size: int = 10 self.concurrent_tasks: int = 5 self.device_info = get_random_device() def cleanup_session_file(self) -> None: session_file = f"{self.session}.session" try: if os.path.exists(session_file): os.remove(session_file) except Exception: time.sleep(1) try: if os.path.exists(session_file): os.remove(session_file) except Exception as e: print(f"{Fore.RED}[-] Не удалось удалить файл сессии: {e}{Style.RESET_ALL}") async def create_session(self) -> bool: try: if os.path.exists(f"{self.session}.session"): self.cleanup_session_file() self.client = TelegramClient( self.session, self.api_id, self.api_hash, device_model=self.device_info["device_model"], system_version=self.device_info["system_version"], app_version=self.device_info["app_version"], lang_code=self.device_info["lang_code"], system_lang_code=self.device_info["system_lang_code"] ) await self.client.connect() if not await self.client.is_user_authorized(): print(f"{Fore.YELLOW}[i] Создание новой сессии...{Style.RESET_ALL}") while True: try: phone = input(f"{Fore.GREEN}Введите номер телефона (+7XXXXXXXXXX): {Style.RESET_ALL}") await self.client.send_code_request(phone) verification_code = input(f"{Fore.GREEN}Введите код подтверждения: {Style.RESET_ALL}") try: await self.client.sign_in(phone, verification_code) except SessionPasswordNeededError: password = input(f"{Fore.YELLOW}Введите пароль двухфакторной аутентификации: {Style.RESET_ALL}") await self.client.sign_in(password=password) print(f"{Fore.GREEN}[+] Сессия успешно создана{Style.RESET_ALL}") return True except Exception as e: print(f"{Fore.RED}[-] Ошибка при создании сессии: {e}{Style.RESET_ALL}") retry = input(f"{Fore.YELLOW}Хотите попробовать снова? (y/n): {Style.RESET_ALL}").lower() if retry != 'y': self.cleanup_session_file() return False return True except Exception as e: print(f"{Fore.RED}[-] Ошибка подключения: {e}{Style.RESET_ALL}") self.cleanup_session_file() return False finally: if self.client and self.client.is_connected(): await self.client.disconnect() def is_bot(self, entity: User) -> bool: if not isinstance(entity, User): return False if getattr(entity, 'bot', False): return True if entity.username: username = entity.username.lower() if username.endswith('bot') or username.startswith('bot'): return True if entity.username and not entity.phone: if entity.username in self.bot_usernames: return True return False async def connect(self) -> bool: try: self.client = TelegramClient( self.session, self.api_id, self.api_hash, device_model=self.device_info["device_model"], system_version=self.device_info["system_version"], app_version=self.device_info["app_version"], lang_code=self.device_info["lang_code"], system_lang_code=self.device_info["system_lang_code"] ) await self.client.connect() if not await self.client.is_user_authorized(): self._print_error(f"Сессия недействительна: {self.session}") return False return True except Exception as e: self._print_error(f"Ошибка подключения {self.session}: {e}") return False def _print_success(self, message: str) -> None: print(f"{Fore.GREEN}[+] [{self.session}] {message}{Style.RESET_ALL}") def _print_error(self, message: str) -> None: print(f"{Fore.RED}[-] [{self.session}] {message}{Style.RESET_ALL}") def _print_info(self, message: str) -> None: print(f"{Fore.YELLOW}[i] [{self.session}] {message}{Style.RESET_ALL}") async def verify_action(self, entity_id: int, action_type: str) -> bool: try: dialog = await self.client.get_entity(entity_id) archived_dialogs = await self.get_archived_dialogs() return not any(d.entity.id == entity_id for d in archived_dialogs) except Exception: return True async def block_user(self, user: User) -> bool: for attempt in range(self.max_retries): try: await self.client(BlockRequest(id=user.id)) await asyncio.sleep(self.retry_delay) if await self.verify_action(user.id, "block"): return True except Exception as e: if attempt == self.max_retries - 1: self._print_error(f"Ошибка при блокировке {user.first_name}: {e}") await asyncio.sleep(self.retry_delay) return False async def delete_dialog_safe(self, entity, name: str) -> bool: for attempt in range(self.max_retries): try: await self.client.delete_dialog(entity) await asyncio.sleep(self.retry_delay) return True except Exception as e: if "not a member" in str(e).lower(): return True if attempt == self.max_retries - 1: self._print_error(f"Ошибка при удалении диалога {name}: {e}") await asyncio.sleep(self.retry_delay) return False async def get_archived_dialogs(self) -> List[Dialog]: archived = [] try: async for dialog in self.client.iter_dialogs(folder=1): archived.append(dialog) await asyncio.sleep(0.1) except Exception as e: self._print_error(f"Ошибка при получении архивных диалогов: {e}") return archived def get_entity_type_name(self, entity) -> str: if isinstance(entity, Channel): return "канал" if entity.broadcast else "группа" elif isinstance(entity, Chat): return "чат" elif isinstance(entity, User): return "бот" if self.is_bot(entity) else "пользователь" return "диалог" async def process_entity(self, dialog: Dialog, action: str) -> bool: entity_type = self.get_entity_type_name(dialog.entity) try: if action == "leave" and isinstance(dialog.entity, (Channel, Chat)): try: await self.client(LeaveChannelRequest(dialog.entity)) await asyncio.sleep(self.retry_delay) self._print_success(f"Покинут {entity_type}: {dialog.name}") return True except Exception as e: if "not a member" not in str(e).lower(): self._print_error(f"Ошибка при выходе из {dialog.name}: {e}") return False return await self.delete_dialog_safe(dialog.entity, dialog.name) except Exception as e: self._print_error(f"Ошибка при обработке {dialog.name}: {e}") return False async def process_batch(self, batch, action_type: str) -> None: tasks = [] for dialog in batch: if action_type == "leave": task = self.process_entity(dialog, "leave") elif action_type == "delete": task = self.delete_dialog_safe(dialog.entity, dialog.name) elif action_type == "block": task = self.block_user(dialog.entity) tasks.append(task) try: await asyncio.gather(*tasks) except FloodWaitError as e: self._print_error(f"Слишком много запросов, ждем {e.seconds} секунд...") await asyncio.sleep(e.seconds) except Exception as e: self._print_error(f"Ошибка при пакетной обработке: {e}") async def process_dialogs_batch(self, dialogs: List[Dialog], action_type: str) -> None: total = len(dialogs) if total == 0: return self._print_info(f"Обработка {total} диалогов...") for i, dialog in enumerate(dialogs): try: if action_type == "leave": await self.process_entity(dialog, "leave") elif action_type == "delete": await self.delete_dialog_safe(dialog.entity, dialog.name) elif action_type == "block": await self.block_user(dialog.entity) progress = min(100, round((i + 1) / total * 100)) print(f"\r{Fore.YELLOW}[i] Прогресс: {progress}%{Style.RESET_ALL}", end="") except Exception as e: self._print_error(f"Ошибка при обработке {dialog.name}: {e}") await asyncio.sleep(0.5) print() async def delete_all(self) -> None: archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return dialogs_by_type = defaultdict(list) for dialog in archived: if isinstance(dialog.entity, User) and self.is_bot(dialog.entity): dialogs_by_type['bots'].append(dialog) elif isinstance(dialog.entity, Channel): dialogs_by_type['channels'].append(dialog) elif isinstance(dialog.entity, Chat): dialogs_by_type['chats'].append(dialog) elif isinstance(dialog.entity, User): dialogs_by_type['users'].append(dialog) if dialogs_by_type['bots']: self._print_info(f"Блокировка {len(dialogs_by_type['bots'])} ботов...") await self.process_dialogs_batch(dialogs_by_type['bots'], "block") if dialogs_by_type['channels'] or dialogs_by_type['chats']: self._print_info(f"Выход из {len(dialogs_by_type['channels']) + len(dialogs_by_type['chats'])} групп/каналов...") await self.process_dialogs_batch(dialogs_by_type['channels'] + dialogs_by_type['chats'], "leave") self._print_info("Удаление всех диалогов...") await self.process_dialogs_batch(archived, "delete") async def delete_except_users(self) -> None: archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return non_users = [d for d in archived if not isinstance(d.entity, User) or self.is_bot(d.entity)] if non_users: self._print_info(f"Обработка {len(non_users)} диалогов...") await self.process_dialogs_batch(non_users, "leave") await self.process_dialogs_batch(non_users, "delete") async def cache_bots(self) -> None: async for dialog in self.client.iter_dialogs(folder=1): if isinstance(dialog.entity, User) and getattr(dialog.entity, 'bot', False): if dialog.entity.username: self.bot_usernames.add(dialog.entity.username.lower()) async def delete_bots(self) -> None: await self.cache_bots() archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return bots = [d for d in archived if isinstance(d.entity, User) and self.is_bot(d.entity)] if not bots: self._print_info("Боты в архиве не найдены") return self._print_info(f"Найдено {len(bots)} ботов") await self.process_dialogs_batch(bots, "block") await self.process_dialogs_batch(bots, "delete") async def delete_channels(self) -> None: archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return channels = [d for d in archived if isinstance(d.entity, Channel) and d.entity.broadcast] if not channels: self._print_info("Каналы в архиве не найдены") return self._print_info(f"Обработка {len(channels)} каналов...") await self.process_dialogs_batch(channels, "leave") await self.process_dialogs_batch(channels, "delete") async def leave_chats(self) -> None: archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return chats = [d for d in archived if isinstance(d.entity, (Channel, Chat)) and (not isinstance(d.entity, Channel) or not d.entity.broadcast)] if chats: self._print_info(f"Выход из {len(chats)} чатов...") await self.process_dialogs_batch(chats, "leave") await self.process_dialogs_batch(chats, "delete") async def delete_users(self) -> None: archived = await self.get_archived_dialogs() if not archived: self._print_info("Архив пуст") return users = [d for d in archived if isinstance(d.entity, User) and not self.is_bot(d.entity)] if users: self._print_info(f"Удаление {len(users)} пользователей...") await self.process_dialogs_batch(users, "delete") async def process_session(session_name: str, action_num: str) -> None: cleaner = TelegramCleaner(session_name) try: if not await cleaner.connect(): return actions: Dict[str, Callable] = { "1": cleaner.delete_all, "2": cleaner.delete_except_users, "3": cleaner.delete_bots, "4": cleaner.delete_channels, "5": cleaner.leave_chats, "6": cleaner.delete_users } if action_num in actions: await actions[action_num]() except Exception as e: print(f"{Fore.RED}[-] [{session_name}] Критическая ошибка: {e}{Style.RESET_ALL}") finally: if cleaner.client and cleaner.client.is_connected(): await cleaner.client.disconnect() def display_menu(sessions_count: int) -> None: print(f"\n{Fore.CYAN}[*] Telegram Archive Cleaner [*]{Style.RESET_ALL}") print(f"{Fore.CYAN}[*] Эмуляция Android-устройства [*]{Style.RESET_ALL}") print(f"{Fore.YELLOW}[i] Найдено сессий: {sessions_count}{Style.RESET_ALL}") print(f"{Fore.WHITE}1. Удалить всё из архива") print("2. Удалить всё из архива кроме людей") print("3. Удалить ботов из архива") print("4. Удалить каналы из архива") print("5. Выйти с чатов из архива") print("6. Удалить людей из архива") print(f"0. Выход{Style.RESET_ALL}") def handle_exit(signum, frame): print(f"\n{Fore.YELLOW}[i] Завершение работы...{Style.RESET_ALL}") try: for task in asyncio.all_tasks(): task.cancel() except Exception: pass finally: print(f"{Fore.CYAN}[*] Программа завершена{Style.RESET_ALL}") os._exit(0) def main() -> None: signal.signal(signal.SIGINT, handle_exit) signal.signal(signal.SIGTERM, handle_exit) while True: try: sessions: List[str] = [f.replace(".session", "") for f in glob.glob("*.session")] if not sessions: print(f"{Fore.YELLOW}[i] Сессии не найдены. Создаём новую...{Style.RESET_ALL}") session_name = input(f"{Fore.GREEN}Введите имя для новой сессии: {Style.RESET_ALL}") try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) cleaner = TelegramCleaner(session_name) if loop.run_until_complete(cleaner.create_session()): sessions = [session_name] else: retry = input(f"{Fore.YELLOW}Хотите создать другую сессию? (y/n): {Style.RESET_ALL}").lower() if retry != 'y': return continue except KeyboardInterrupt: handle_exit(None, None) except Exception as e: print(f"{Fore.RED}[-] Произошла ошибка: {e}{Style.RESET_ALL}") retry = input(f"{Fore.YELLOW}Хотите попробовать снова? (y/n): {Style.RESET_ALL}").lower() if retry != 'y': return continue while True: try: display_menu(len(sessions)) choice = input(f"\n{Fore.GREEN}>>> {Style.RESET_ALL}") if choice == "0": print(f"{Fore.CYAN}[*] Программа завершена{Style.RESET_ALL}") return if choice not in ["1", "2", "3", "4", "5", "6"]: print(f"{Fore.RED}[-] Неверный выбор{Style.RESET_ALL}") continue loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) max_concurrent = min(len(sessions), 3) with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) as executor: futures = [ loop.create_task(process_session(session, choice)) for session in sessions ] loop.run_until_complete(asyncio.gather(*futures)) except KeyboardInterrupt: handle_exit(None, None) except Exception as e: print(f"{Fore.RED}[-] Произошла ошибка: {e}{Style.RESET_ALL}") retry = input(f"{Fore.YELLOW}Продолжить работу? (y/n): {Style.RESET_ALL}").lower() if retry != 'y': return except KeyboardInterrupt: handle_exit(None, None) if __name__ == "__main__": try: main() except KeyboardInterrupt: handle_exit(None, None) except Exception as e: print(f"{Fore.RED}[-] Критическая ошибка: {e}{Style.RESET_ALL}") finally: print(f"{Fore.CYAN}[*] Работа завершена{Style.RESET_ALL}") Зависимости: pip install Telethon==1.32.1 pip install colorama==0.4.6 pip install cryptg==0.4.0 pip install asyncio==3.4.3 Python pip install Telethon==1.32.1 pip install colorama==0.4.6 pip install cryptg==0.4.0 pip install asyncio==3.4.3 Обновление 0.1: - Добавлена нормальная эмуляция устройств для уменьшения шанса на вылет сессии
Спасибо. Постоянно пользовался телеграфом на дроиде чтоб чистить каналы, теперь хоть нормальным скриптом можно))