Загрузка...

Скрипт Удалятор всего из архива

Тема в разделе Python создана пользователем NotPixel 1 июн 2025 в 19:24. (поднята 5 июн 2025 в 21:46) 171 просмотр

Загрузка...
  1. NotPixel
    NotPixel Автор темы 1 июн 2025 в 19:24 2149 30 янв 2023
    Думаю не у одного меня архив был забит разной рекламой когда для доступа к боту просят подписаться на канал, перейти в ебаного бота или ещё какая-то хуйня

    Короче кратко, теперь есть скрипт который удалит всё нахуй из архива

    За то что могут снести аккаунт, ответственность не несу. У меня не сносило. Со своих аккаунтов вышел с более чем 155 каналов только

    ЕСЛИ ЕСТЬ ПРЕДЛОЖЕНИЯ ПО УЛУЧШЕНИЮ, ПИШИТЕ
    [IMG]


    Для работы надо:
    * Закинуть сессию в папку со скриптом (можно не одну) --> Или если сессий неету сркипт предложит создать её
    * Выбрать чё-то
    [IMG]


    Code:
    Python
    import sys
    import os
    import glob
    import asyncio
    import concurrent.futures
    import signal
    from typing import Dict, Callable, List, Optional, Set, Tuple
    from telethon.sync import TelegramClient
    from telethon.tl.types import User, Channel, Chat, InputPeerUser, Dialog, DialogFilter
    from telethon.tl.functions.channels import LeaveChannelRequest
    from telethon.tl.functions.messages import DeleteHistoryRequest
    from telethon.tl.functions.contacts import BlockRequest
    from telethon.errors import SessionPasswordNeededError, FloodWaitError
    from colorama import init, Fore, Style
    import time
    from collections import defaultdict
    import platform
    import random

    init(autoreset=True)

    ANDROID_DEVICES = [
    ("SM-S918B", "Samsung Galaxy S23 Ultra", "13", "TQ3C.230901.001.B1"),
    ("SM-X916B", "Samsung Galaxy Tab S9 Ultra", "13", "TQ3C.230901.001.B1"),
    ("SM-F946B", "Samsung Galaxy Z Fold5", "13", "TQ3C.230901.001.B1"),
    ("M2102K1G", "Xiaomi 12 Pro", "13", "TQ3C.230901.001.B1"),
    ("2201122G", "Xiaomi 12", "13", "TQ3C.230901.001.B1"),
    ("CPH2305", "OPPO Find X5 Pro", "13", "TQ3C.230901.001.B1"),
    ("V2145A", "vivo X80 Pro", "13", "TQ3C.230901.001.B1"),
    ("LE2121", "OnePlus 9 Pro", "13", "TQ3C.230901.001.B1"),
    ("ASUS_I005DA", "ASUS ROG Phone 6", "13", "TQ3C.230901.001.B1")
    ]

    def get_random_device():
    device = random.choice(ANDROID_DEVICES)
    sdk_version = 33
    build_number = device[3]
    return {
    "device_model": device[1],
    "system_version": f"Android {device[2]} (SDK {sdk_version}; {build_number})",
    "app_version": "9.7.3",
    "lang_code": "ru",
    "system_lang_code": "ru"
    }

    class TelegramCleaner:
    def __init__(self, session_name: str) -> None:
    self.api_id: int = 2040
    self.api_hash: str = "b18441a1ff607e10a989891a5462e627"
    self.session: str = session_name
    self.client: Optional[TelegramClient] = None
    self.bot_usernames: Set[str] = set()
    self.max_retries: int = 3
    self.retry_delay: float = 1.0
    self.batch_size: int = 10
    self.concurrent_tasks: int = 5
    self.device_info = get_random_device()

    def cleanup_session_file(self) -> None:
    session_file = f"{self.session}.session"
    try:
    if os.path.exists(session_file):
    os.remove(session_file)
    except Exception:
    time.sleep(1)
    try:
    if os.path.exists(session_file):
    os.remove(session_file)
    except Exception as e:
    print(f"{Fore.RED}[-] Не удалось удалить файл сессии: {e}{Style.RESET_ALL}")

    async def create_session(self) -> bool:
    try:
    if os.path.exists(f"{self.session}.session"):
    self.cleanup_session_file()

    self.client = TelegramClient(
    self.session,
    self.api_id,
    self.api_hash,
    device_model=self.device_info["device_model"],
    system_version=self.device_info["system_version"],
    app_version=self.device_info["app_version"],
    lang_code=self.device_info["lang_code"],
    system_lang_code=self.device_info["system_lang_code"]
    )

    await self.client.connect()

    if not await self.client.is_user_authorized():
    print(f"{Fore.YELLOW}[i] Создание новой сессии...{Style.RESET_ALL}")
    while True:
    try:
    phone = input(f"{Fore.GREEN}Введите номер телефона (+7XXXXXXXXXX): {Style.RESET_ALL}")
    await self.client.send_code_request(phone)
    verification_code = input(f"{Fore.GREEN}Введите код подтверждения: {Style.RESET_ALL}")

    try:
    await self.client.sign_in(phone, verification_code)
    except SessionPasswordNeededError:
    password = input(f"{Fore.YELLOW}Введите пароль двухфакторной аутентификации: {Style.RESET_ALL}")
    await self.client.sign_in(password=password)

    print(f"{Fore.GREEN}[+] Сессия успешно создана{Style.RESET_ALL}")
    return True

    except Exception as e:
    print(f"{Fore.RED}[-] Ошибка при создании сессии: {e}{Style.RESET_ALL}")
    retry = input(f"{Fore.YELLOW}Хотите попробовать снова? (y/n): {Style.RESET_ALL}").lower()
    if retry != 'y':
    self.cleanup_session_file()
    return False
    return True

    except Exception as e:
    print(f"{Fore.RED}[-] Ошибка подключения: {e}{Style.RESET_ALL}")
    self.cleanup_session_file()
    return False
    finally:
    if self.client and self.client.is_connected():
    await self.client.disconnect()

    def is_bot(self, entity: User) -> bool:
    if not isinstance(entity, User):
    return False

    if getattr(entity, 'bot', False):
    return True

    if entity.username:
    username = entity.username.lower()
    if username.endswith('bot') or username.startswith('bot'):
    return True

    if entity.username and not entity.phone:
    if entity.username in self.bot_usernames:
    return True

    return False

    async def connect(self) -> bool:
    try:
    self.client = TelegramClient(
    self.session,
    self.api_id,
    self.api_hash,
    device_model=self.device_info["device_model"],
    system_version=self.device_info["system_version"],
    app_version=self.device_info["app_version"],
    lang_code=self.device_info["lang_code"],
    system_lang_code=self.device_info["system_lang_code"]
    )
    await self.client.connect()
    if not await self.client.is_user_authorized():
    self._print_error(f"Сессия недействительна: {self.session}")
    return False
    return True
    except Exception as e:
    self._print_error(f"Ошибка подключения {self.session}: {e}")
    return False

    def _print_success(self, message: str) -> None:
    print(f"{Fore.GREEN}[+] [{self.session}] {message}{Style.RESET_ALL}")

    def _print_error(self, message: str) -> None:
    print(f"{Fore.RED}[-] [{self.session}] {message}{Style.RESET_ALL}")

    def _print_info(self, message: str) -> None:
    print(f"{Fore.YELLOW}[i] [{self.session}] {message}{Style.RESET_ALL}")

    async def verify_action(self, entity_id: int, action_type: str) -> bool:
    try:
    dialog = await self.client.get_entity(entity_id)
    archived_dialogs = await self.get_archived_dialogs()
    return not any(d.entity.id == entity_id for d in archived_dialogs)
    except Exception:
    return True

    async def block_user(self, user: User) -> bool:
    for attempt in range(self.max_retries):
    try:
    await self.client(BlockRequest(id=user.id))
    await asyncio.sleep(self.retry_delay)
    if await self.verify_action(user.id, "block"):
    return True
    except Exception as e:
    if attempt == self.max_retries - 1:
    self._print_error(f"Ошибка при блокировке {user.first_name}: {e}")
    await asyncio.sleep(self.retry_delay)
    return False

    async def delete_dialog_safe(self, entity, name: str) -> bool:
    for attempt in range(self.max_retries):
    try:
    await self.client.delete_dialog(entity)
    await asyncio.sleep(self.retry_delay)
    return True
    except Exception as e:
    if "not a member" in str(e).lower():
    return True
    if attempt == self.max_retries - 1:
    self._print_error(f"Ошибка при удалении диалога {name}: {e}")
    await asyncio.sleep(self.retry_delay)
    return False

    async def get_archived_dialogs(self) -> List[Dialog]:
    archived = []
    try:
    async for dialog in self.client.iter_dialogs(folder=1):
    archived.append(dialog)
    await asyncio.sleep(0.1)
    except Exception as e:
    self._print_error(f"Ошибка при получении архивных диалогов: {e}")
    return archived

    def get_entity_type_name(self, entity) -> str:
    if isinstance(entity, Channel):
    return "канал" if entity.broadcast else "группа"
    elif isinstance(entity, Chat):
    return "чат"
    elif isinstance(entity, User):
    return "бот" if self.is_bot(entity) else "пользователь"
    return "диалог"

    async def process_entity(self, dialog: Dialog, action: str) -> bool:
    entity_type = self.get_entity_type_name(dialog.entity)

    try:
    if action == "leave" and isinstance(dialog.entity, (Channel, Chat)):
    try:
    await self.client(LeaveChannelRequest(dialog.entity))
    await asyncio.sleep(self.retry_delay)
    self._print_success(f"Покинут {entity_type}: {dialog.name}")
    return True
    except Exception as e:
    if "not a member" not in str(e).lower():
    self._print_error(f"Ошибка при выходе из {dialog.name}: {e}")
    return False

    return await self.delete_dialog_safe(dialog.entity, dialog.name)
    except Exception as e:
    self._print_error(f"Ошибка при обработке {dialog.name}: {e}")
    return False

    async def process_batch(self, batch, action_type: str) -> None:
    tasks = []
    for dialog in batch:
    if action_type == "leave":
    task = self.process_entity(dialog, "leave")
    elif action_type == "delete":
    task = self.delete_dialog_safe(dialog.entity, dialog.name)
    elif action_type == "block":
    task = self.block_user(dialog.entity)
    tasks.append(task)

    try:
    await asyncio.gather(*tasks)
    except FloodWaitError as e:
    self._print_error(f"Слишком много запросов, ждем {e.seconds} секунд...")
    await asyncio.sleep(e.seconds)
    except Exception as e:
    self._print_error(f"Ошибка при пакетной обработке: {e}")

    async def process_dialogs_batch(self, dialogs: List[Dialog], action_type: str) -> None:
    total = len(dialogs)
    if total == 0:
    return

    self._print_info(f"Обработка {total} диалогов...")

    for i, dialog in enumerate(dialogs):
    try:
    if action_type == "leave":
    await self.process_entity(dialog, "leave")
    elif action_type == "delete":
    await self.delete_dialog_safe(dialog.entity, dialog.name)
    elif action_type == "block":
    await self.block_user(dialog.entity)

    progress = min(100, round((i + 1) / total * 100))
    print(f"\r{Fore.YELLOW}[i] Прогресс: {progress}%{Style.RESET_ALL}", end="")

    except Exception as e:
    self._print_error(f"Ошибка при обработке {dialog.name}: {e}")

    await asyncio.sleep(0.5)

    print()

    async def delete_all(self) -> None:
    archived = await self.get_archived_dialogs()
    if not archived:
    self._print_info("Архив пуст")
    return

    dialogs_by_type = defaultdict(list)
    for dialog in archived:
    if isinstance(dialog.entity, User) and self.is_bot(dialog.entity):
    dialogs_by_type['bots'].append(dialog)
    elif isinstance(dialog.entity, Channel):
    dialogs_by_type['channels'].append(dialog)
    elif isinstance(dialog.entity, Chat):
    dialogs_by_type['chats'].append(dialog)
    elif isinstance(dialog.entity, User):
    dialogs_by_type['users'].append(dialog)

    if dialogs_by_type['bots']:
    self._print_info(f"Блокировка {len(dialogs_by_type['bots'])} ботов...")
    await self.process_dialogs_batch(dialogs_by_type['bots'], "block")

    if dialogs_by_type['channels'] or dialogs_by_type['chats']:
    self._print_info(f"Выход из {len(dialogs_by_type['channels']) + len(dialogs_by_type['chats'])} групп/каналов...")
    await self.process_dialogs_batch(dialogs_by_type['channels'] + dialogs_by_type['chats'], "leave")

    self._print_info("Удаление всех диалогов...")
    await self.process_dialogs_batch(archived, "delete")

    async def delete_except_users(self) -> None:
    archived = await self.get_archived_dialogs()
    if not archived:
    self._print_info("Архив пуст")
    return

    non_users = [d for d in archived if not isinstance(d.entity, User) or self.is_bot(d.entity)]
    if non_users:
    self._print_info(f"Обработка {len(non_users)} диалогов...")
    await self.process_dialogs_batch(non_users, "leave")
    await self.process_dialogs_batch(non_users, "delete")

    async def cache_bots(self) -> None:
    async for dialog in self.client.iter_dialogs(folder=1):
    if isinstance(dialog.entity, User) and getattr(dialog.entity, 'bot', False):
    if dialog.entity.username:
    self.bot_usernames.add(dialog.entity.username.lower())

    async def delete_bots(self) -> None:
    await self.cache_bots()
    archived = await self.get_archived_dialogs()
    if not archived:
    self._print_info("Архив пуст")
    return

    bots = [d for d in archived if isinstance(d.entity, User) and self.is_bot(d.entity)]
    if not bots:
    self._print_info("Боты в архиве не найдены")
    return

    self._print_info(f"Найдено {len(bots)} ботов")
    await self.process_dialogs_batch(bots, "block")
    await self.process_dialogs_batch(bots, "delete")

    async def delete_channels(self) -> None:
    archived = await self.get_archived_dialogs()
    if not archived:
    self._print_info("Архив пуст")
    return

    channels = [d for d in archived if isinstance(d.entity, Channel) and d.entity.broadcast]
    if not channels:
    self._print_info("Каналы в архиве не найдены")
    return

    self._print_info(f"Обработка {len(channels)} каналов...")
    await self.process_dialogs_batch(channels, "leave")
    await self.process_dialogs_batch(channels, "delete")

    async def leave_chats(self) -> None:
    archived = await self.get_archived_dialogs()
    if not archived:
    self._print_info("Архив пуст")
    return

    chats = [d for d in archived if isinstance(d.entity, (Channel, Chat)) and
    (not isinstance(d.entity, Channel) or not d.entity.broadcast)]
    if chats:
    self._print_info(f"Выход из {len(chats)} чатов...")
    await self.process_dialogs_batch(chats, "leave")
    await self.process_dialogs_batch(chats, "delete")

    async def delete_users(self) -> None:
    archived = await self.get_archived_dialogs()
    if not archived:
    self._print_info("Архив пуст")
    return

    users = [d for d in archived if isinstance(d.entity, User) and not self.is_bot(d.entity)]
    if users:
    self._print_info(f"Удаление {len(users)} пользователей...")
    await self.process_dialogs_batch(users, "delete")

    async def process_session(session_name: str, action_num: str) -> None:
    cleaner = TelegramCleaner(session_name)
    try:
    if not await cleaner.connect():
    return

    actions: Dict[str, Callable] = {
    "1": cleaner.delete_all,
    "2": cleaner.delete_except_users,
    "3": cleaner.delete_bots,
    "4": cleaner.delete_channels,
    "5": cleaner.leave_chats,
    "6": cleaner.delete_users
    }

    if action_num in actions:
    await actions[action_num]()
    except Exception as e:
    print(f"{Fore.RED}[-] [{session_name}] Критическая ошибка: {e}{Style.RESET_ALL}")
    finally:
    if cleaner.client and cleaner.client.is_connected():
    await cleaner.client.disconnect()

    def display_menu(sessions_count: int) -> None:
    print(f"\n{Fore.CYAN}[*] Telegram Archive Cleaner [*]{Style.RESET_ALL}")
    print(f"{Fore.CYAN}[*] Эмуляция Android-устройства [*]{Style.RESET_ALL}")
    print(f"{Fore.YELLOW}[i] Найдено сессий: {sessions_count}{Style.RESET_ALL}")
    print(f"{Fore.WHITE}1. Удалить всё из архива")
    print("2. Удалить всё из архива кроме людей")
    print("3. Удалить ботов из архива")
    print("4. Удалить каналы из архива")
    print("5. Выйти с чатов из архива")
    print("6. Удалить людей из архива")
    print(f"0. Выход{Style.RESET_ALL}")

    def handle_exit(signum, frame):
    print(f"\n{Fore.YELLOW}[i] Завершение работы...{Style.RESET_ALL}")
    try:
    for task in asyncio.all_tasks():
    task.cancel()
    except Exception:
    pass
    finally:
    print(f"{Fore.CYAN}[*] Программа завершена{Style.RESET_ALL}")
    os._exit(0)

    def main() -> None:
    signal.signal(signal.SIGINT, handle_exit)
    signal.signal(signal.SIGTERM, handle_exit)

    while True:
    try:
    sessions: List[str] = [f.replace(".session", "") for f in glob.glob("*.session")]

    if not sessions:
    print(f"{Fore.YELLOW}[i] Сессии не найдены. Создаём новую...{Style.RESET_ALL}")
    session_name = input(f"{Fore.GREEN}Введите имя для новой сессии: {Style.RESET_ALL}")

    try:
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    cleaner = TelegramCleaner(session_name)
    if loop.run_until_complete(cleaner.create_session()):
    sessions = [session_name]
    else:
    retry = input(f"{Fore.YELLOW}Хотите создать другую сессию? (y/n): {Style.RESET_ALL}").lower()
    if retry != 'y':
    return
    continue
    except KeyboardInterrupt:
    handle_exit(None, None)
    except Exception as e:
    print(f"{Fore.RED}[-] Произошла ошибка: {e}{Style.RESET_ALL}")
    retry = input(f"{Fore.YELLOW}Хотите попробовать снова? (y/n): {Style.RESET_ALL}").lower()
    if retry != 'y':
    return
    continue

    while True:
    try:
    display_menu(len(sessions))
    choice = input(f"\n{Fore.GREEN}>>> {Style.RESET_ALL}")

    if choice == "0":
    print(f"{Fore.CYAN}[*] Программа завершена{Style.RESET_ALL}")
    return

    if choice not in ["1", "2", "3", "4", "5", "6"]:
    print(f"{Fore.RED}[-] Неверный выбор{Style.RESET_ALL}")
    continue

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    max_concurrent = min(len(sessions), 3)

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) as executor:
    futures = [
    loop.create_task(process_session(session, choice))
    for session in sessions
    ]
    loop.run_until_complete(asyncio.gather(*futures))

    except KeyboardInterrupt:
    handle_exit(None, None)
    except Exception as e:
    print(f"{Fore.RED}[-] Произошла ошибка: {e}{Style.RESET_ALL}")
    retry = input(f"{Fore.YELLOW}Продолжить работу? (y/n): {Style.RESET_ALL}").lower()
    if retry != 'y':
    return
    except KeyboardInterrupt:
    handle_exit(None, None)

    if __name__ == "__main__":
    try:
    main()
    except KeyboardInterrupt:
    handle_exit(None, None)
    except Exception as e:
    print(f"{Fore.RED}[-] Критическая ошибка: {e}{Style.RESET_ALL}")
    finally:
    print(f"{Fore.CYAN}[*] Работа завершена{Style.RESET_ALL}")
    Зависимости:
    Python
    pip install Telethon==1.32.1
    pip install colorama==0.4.6
    pip install cryptg==0.4.0
    pip install asyncio==3.4.3
    - Добавлена нормальная эмуляция устройств для уменьшения шанса на вылет сессии
     
    1 июн 2025 в 19:24 Изменено
  2. virtualhost
    легенда бро
     
  3. Магнит
    Магнит 1 июн 2025 в 19:26 гость —> :cai::cai::cai: 764 18 дек 2021
    Спасибо. Постоянно пользовался телеграфом на дроиде чтоб чистить каналы, теперь хоть нормальным скриптом можно))
     
    1. 005
      Магнит, ник крутой, но оформление аниме хуйня
  4. кошак
    кошак 1 июн 2025 в 19:26 Заблокирован(а) 455 13 июл 2021
    бля ты гений просто, я так страдал просто я в ахуе
     
  5. AkNma
    Бля я все интимки удалил, а скрипт работает
     
  6. GUCCI
    GUCCI 1 июн 2025 в 19:34 ЛУЧШИЕ ВЕРИФИКАЦИИ - https://lolz.live/threads/4228395/ 10 036 26 апр 2017
    сессию не сносит?
     
    1. NotPixel Автор темы
      GUCCI, у меня не сносило пока тестил, на 3
    2. NotPixel Автор темы
      GUCCI, на основе вышел с 155 каналов
  7. PowerDevil
    PowerDevil 1 июн 2025 в 20:30 11 957 27 авг 2022
    плохо что не передаешь девайс и сдк сессии умирать будут
     
    1. NotPixel Автор темы
      PowerDevil, спасибо за совет. Исправил
Top