Загрузка...

Зашифрованное хранилище | Telegram Bot

Тема в разделе Python создана пользователем Yablloko 16 мар 2025. (поднята 24 апр 2025) 286 просмотров

Загрузка...
  1. Yablloko
    Yablloko Автор темы 16 мар 2025 85 21 янв 2018
    Зашифрованное хранилище
    Это Telegram-бот, который позволяет безопасно шифровать, хранить и расшифровывать файлы — будь то фото, видео, ********* или голосовые сообщения. Бот генерирует уникальный AES‑ключ на основе введённого мастер‑пароля, шифрует им файлы и сохраняет их в зашифрованном виде. Для доступа к содержимому требуется ввод правильного мастер‑пароля, что обеспечивает высокую защиту даже при возможном компрометации сервера. Проект прост в использовании и гарантирует, что ваши данные останутся в безопасности, если кто-то попробует их расшифровать без вашего пароля.

    /key — Установить новый ключ (только админ).
    /set — Загрузить зашифрованный ключ в память (админ вводит мастер‑пароль).
    /del — Удалить ключ из памяти вручную.
    /all — Запросить мастер‑пароль и расшифровать файлы.
    1. Установить новый ключ:
    Админ отправляет команду /key
    Вводит «новый ключ» (это используется для генерации AES‑ключа).
    Вводит мастер‑пароль для шифрования ключа.
    Бот сохраняет зашифрованный ключ, который теперь будет использоваться для шифрования файлов.

    ПЕРЕЗАПУСКАЕМ БОТА

    2. Загрузить ключ в память:
    После перезапуска бота отправьте команду /set.
    Введите мастер‑пароль для дешифровки ключа.
    Если пароль верный, ключ будет загружен в память и бот сможет шифровать/расшифровывать файлы.

    3. Отправка файлов:
    Просто отправьте боту фото, видео, документ или голосовое сообщение.
    Бот зашифрует содержимое вместе с именем файла и сохранит его в хранилище.

    4. Расшифровка файлов:
    Отправьте команду /all.
    Введите мастер‑пароль.
    Если пароль верный, бот расшифрует файлы и отправит их обратно с оригинальными именами.
    После этого ключ удаляется из памяти для безопасности.

    5. Удаление ключа из памяти вручную:
    Используйте команду /del, если хотите принудительно удалить ключ из памяти.
    Python
    pip install aiogram cryptography structlog asyncio hashlib ctypes os logging
    Python
    import os

    import logging

    import hashlib

    import asyncio

    import ctypes

    import struct

    import unicodedata

    import unicodedata, re

    from cryptography.hazmat.primitives.ciphers.aead import AESGCM

    from aiogram import Bot, Dispatcher, Router, F

    from aiogram.types import Message, FSInputFile

    from aiogram.filters import Command

    from aiogram.enums import ParseMode

    from aiogram.client.default import DefaultBotProperties

    from aiogram.fsm.context import FSMContext

    from aiogram.fsm.state import State, StatesGroup

    logging.basicConfig(level=logging.INFO)

    TOKEN = "токен"

    ADMIN_ID = 8039488237

    KEY_ENCRYPTION_FILE = "key_enc.bin"

    STORAGE_DIR = "encrypted_files"

    INACTIVITY_TIMEOUT = 300

    if not os.path.exists(STORAGE_DIR)

    os.makedirs(STORAGE_DIR)

    ENCRYPTION_KEY = None

    inactivity_timer = None

    router = Router()
    def normalize_filename(filename: str) -> str:

    base, ext = os.path.splitext(filename)



    base = unicodedata.normalize('NFC', base)



    base = re.sub(r'[^\w\s-]', '', base).strip()



    base = re.sub(r'[-\s]+', '-', base)

    return base + ext

    def encrypt_key(master_password: str, key: bytes) -> bytes:

    salt = os.urandom(16)

    master_key = hashlib.pbkdf2_hmac("sha256", master_password.encode(), salt, 100000)

    aesgcm = AESGCM(master_key)

    nonce = os.urandom(12)

    encrypted_key = aesgcm.encrypt(nonce, key, None)

    return salt + nonce + encrypted_key

    def decrypt_key(master_password: str, encrypted_data: bytes) -> bytes:

    salt = encrypted_data[:16]

    nonce = encrypted_data[16:28]

    encrypted_key = encrypted_data[28:]

    master_key = hashlib.pbkdf2_hmac("sha256", master_password.encode(), salt, 100000)

    aesgcm = AESGCM(master_key)

    return aesgcm.decrypt(nonce, encrypted_key, None)

    inactivity_timer = None

    def schedule_key_deletion():

    global inactivity_timer

    if inactivity_timer:

    inactivity_timer.cancel()

    inactivity_timer = asyncio.create_task(clear_key_after_delay())

    async def clear_key_after_delay():

    await asyncio.sleep(INACTIVITY_TIMEOUT)

    global ENCRYPTION_KEY

    ENCRYPTION_KEY = None

    logging.info("Ключ удалён из памяти по таймауту.")

    def cancel_key_deletion():

    global inactivity_timer

    if inactivity_timer:

    inactivity_timer.cancel()

    inactivity_timer = None

    class KeyStates(StatesGroup):

    waiting_for_key = State()

    waiting_for_password = State()

    @router.message(Command("key"))

    async def start_set_key(message: Message, state: FSMContext):

    if os.path.exists(KEY_ENCRYPTION_FILE) and message.from_user.id != ADMIN_ID:

    await message.answer("⚠ Ключ уже установлен. Только админ может его изменить.")

    return

    await message.answer("Введите новый ключ:")

    await state.set_state(KeyStates.waiting_for_key)

    @router.message(KeyStates.waiting_for_key)

    async def process_set_key(message: Message, state: FSMContext):

    await state.update_data(new_key=message.text.strip())

    await message.answer("🔑 Введите мастер-пароль для шифрования ключа:")

    await state.set_state(KeyStates.waiting_for_password)

    @router.message(KeyStates.waiting_for_password)

    async def receive_master_password_for_set(message: Message, state: FSMContext):

    global ENCRYPTION_KEY

    data = await state.get_data()

    new_key = data.get("new_key")

    if not new_key:

    await message.answer("❌ Ошибка: ключ не был передан. Попробуйте снова, начав с /key.")

    await state.clear()

    return

    master_password = message.text.strip()

    for file in os.listdir(STORAGE_DIR):

    os.remove(os.path.join(STORAGE_DIR, file))

    salt_for_kdf = os.urandom(16)

    aes_key = hashlib.pbkdf2_hmac("sha256", new_key.encode(), salt_for_kdf, 100000)

    ENCRYPTION_KEY = aes_key

    encrypted_key = salt_for_kdf + encrypt_key(master_password, aes_key)

    with open(KEY_ENCRYPTION_FILE, "wb") as f:

    f.write(encrypted_key)

    await message.answer("✅ Ключ установлен.")

    cancel_key_deletion()

    schedule_key_deletion()

    await state.clear()

    class LoadKeyState(StatesGroup):

    waiting_for_load_password = State()

    class LoadKeyState(StatesGroup):

    waiting_for_load_password = State()

    @router.message(Command("set"))

    async def load_key_command(message: Message, state: FSMContext):

    if message.from_user.id != ADMIN_ID:

    await message.answer("⚠ Только админ может загрузить ключ.")

    return

    await message.answer("Введите мастер пароль для загрузки ключа:")

    await state.set_state(LoadKeyState.waiting_for_load_password)

    @router.message(LoadKeyState.waiting_for_load_password)

    async def process_load_key(message: Message, state: FSMContext):

    global ENCRYPTION_KEY

    master_password = message.text.strip()

    try:

    with open(KEY_ENCRYPTION_FILE, "rb") as f:

    data = f.read()



    decrypted_key = decrypt_key(master_password, data[16:])

    ENCRYPTION_KEY = decrypted_key

    await message.answer("✅ Ключ успешно загружен в память.")

    cancel_key_deletion()

    schedule_key_deletion()

    except Exception as e:

    logging.error(f"Ошибка загрузки ключа: {e}")

    await message.answer("❌ Неверный мастер пароль.")

    await state.clear()

    @router.message(Command("del"))

    async def delete_key(message: Message):

    global ENCRYPTION_KEY

    if message.from_user.id != ADMIN_ID:

    await message.answer("⚠ Только админ может удалить ключ из памяти.")

    return

    ENCRYPTION_KEY = None

    cancel_key_deletion()

    await message.answer("✅ Ключ удалён из памяти.")

    class AllDecryptionState(StatesGroup):

    waiting_for_all_password = State()

    @router.message(Command("all"))

    async def all_command(message: Message, state: FSMContext):

    await message.answer("🔑 Введите мастер пароль для расшифровки файлов:")

    await state.set_state(AllDecryptionState.waiting_for_all_password)

    @router.message(AllDecryptionState.waiting_for_all_password)

    async def process_all_command(message: Message, state: FSMContext, bot: Bot):

    master_password = message.text.strip()

    try:

    with open(KEY_ENCRYPTION_FILE, "rb") as f:

    data = f.read()

    salt_for_kdf = data[:16]

    encrypted_key_part = data[16:]

    decryption_key = decrypt_key(master_password, encrypted_key_part)

    except Exception as e:

    logging.error(f"Ошибка расшифровки ключа при /all: {e}")

    await message.answer("❌ Неверный мастер пароль.")

    await state.clear()

    return

    files = os.listdir(STORAGE_DIR)

    if not files:

    await message.answer("📂 В хранилище нет файлов.")

    await state.clear()

    return

    for file in files:

    if not file.endswith(".enc"):

    continue

    file_path = os.path.join(STORAGE_DIR, file)

    try:

    with open(file_path, "rb") as f:

    name_len_bytes = f.read(4)

    name_len = struct.unpack(">I", name_len_bytes)[0]

    file_type_byte = f.read(1)

    file_type = struct.unpack("B", file_type_byte)[0]

    nonce_name = f.read(12)

    encrypted_filename = f.read(name_len)

    nonce_content = f.read(12)

    ciphertext = f.read()

    aesgcm = AESGCM(decryption_key)

    original_filename = aesgcm.decrypt(nonce_name, encrypted_filename, None).decode("utf-8")

    base, ext = os.path.splitext(original_filename)

    normalized_name = re.sub(r'[^\w\s-]', '', unicodedata.normalize('NFC', base)).strip()

    normalized_name = re.sub(r'[-\s]+', '-', normalized_name)

    original_filename = normalized_name + ext

    plaintext = aesgcm.decrypt(nonce_content, ciphertext, None)

    except Exception as e:

    logging.error(f"Ошибка расшифровки файла {file}: {e}")

    await message.answer(f"❌ Ошибка при расшифровке файла {file}")

    continue

    decrypted_file_path = os.path.join(STORAGE_DIR, f"decrypted_{original_filename}")

    with open(decrypted_file_path, "wb") as f:

    f.write(plaintext)

    if file_type == 1:

    await bot.send_photo(message.chat.id, photo=FSInputFile(decrypted_file_path))

    elif file_type == 2:

    await bot.send_video(message.chat.id, video=FSInputFile(decrypted_file_path))

    elif file_type == 3:

    await bot.send_voice(message.chat.id, voice=FSInputFile(decrypted_file_path))

    elif file_type == 4:

    await bot.send_video_note(message.chat.id, video_note=FSInputFile(decrypted_file_path))

    elif file_type == 5:

    await bot.send_document(message.chat.id, document=FSInputFile(decrypted_file_path))

    else:

    await bot.send_document(message.chat.id, document=FSInputFile(decrypted_file_path))

    os.remove(decrypted_file_path)

    await message.answer("✅ Файлы расшифрованы и отправлены.")

    ENCRYPTION_KEY = None

    cancel_key_deletion()

    await state.clear()

    @router.message(F.photo | F.video | F.voice | F.video_note | F.document)

    async def handle_file(message: Message, bot: Bot):

    if ENCRYPTION_KEY is None:

    await message.answer("⚠ Ключ не загружен в память. Установите его через /key или загрузите через /set.")

    return

    file_id = None

    original_filename = None

    file_type = None

    if message.photo:

    file_type = 1

    file_id = message.photo[-1].file_id

    original_filename = f"{file_id}.jpg"

    elif message.video:

    file_type = 2

    file_id = message.video.file_id

    file_info = await bot.get_file(file_id)

    original_filename = os.path.basename(file_info.file_path)

    elif message.voice:

    file_type = 3

    file_id = message.voice.file_id

    original_filename = f"{file_id}.ogg"

    elif message.video_note:

    file_type = 4

    file_id = message.video_note.file_id

    original_filename = f"{file_id}.mp4"

    elif message.document:

    file_type = 5

    file_id = message.document.file_id

    original_filename = message.document.file_name if message.document.file_name else f"{file_id}"

    else:

    await message.answer("⚠ Неизвестный тип файла.")

    return

    original_filename = normalize_filename(original_filename)

    file_info = await bot.get_file(file_id)

    file_bytes = await bot.download_file(file_info.file_path)

    if hasattr(file_bytes, "read"):

    file_bytes = file_bytes.read()

    aesgcm = AESGCM(ENCRYPTION_KEY)

    nonce_name = os.urandom(12)

    nonce_content = os.urandom(12)

    encrypted_filename = aesgcm.encrypt(nonce_name, original_filename.encode("utf-8"), None)

    ciphertext = aesgcm.encrypt(nonce_content, file_bytes, None)

    name_len_bytes = struct.pack(">I", len(encrypted_filename))

    file_type_byte = struct.pack("B", file_type)

    data_to_write = name_len_bytes + file_type_byte + nonce_name + encrypted_filename + nonce_content + ciphertext

    encrypted_filename_on_disk = f"{file_id}.enc"

    file_path = os.path.join(STORAGE_DIR, encrypted_filename_on_disk)

    with open(file_path, "wb") as f:

    f.write(data_to_write)

    await message.answer("✅ Файл зашифрован и сохранён.")

    schedule_key_deletion()

    async def main():

    bot = Bot(token=TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))

    dp = Dispatcher()

    dp.include_router(router)

    await bot.delete_webhook(drop_pending_updates=True)

    await dp.start_polling(bot)

    if __name__ == "__main__":

    asyncio.run(main())
     
  2. детектив
    детектив 17 мар 2025 elvis has left the building 260 19 ноя 2019
    и нахуя
     
    1. Yablloko Автор темы
  3. Солнцеестояние
    что по pep8 ? больно смотреть, а так пойдет вроде
     
  4. braconn1er
    braconn1er 19 мар 2025 22 8 окт 2023
    То есть, чтобы не хранить файлы у себя на сервере, можно у себя на сервере запустить бота, и хранить файлы на диске сервера бота??

    Не проще ли просто шифровать файлы локально через age или gpg и загрузить в saved messages или специальную группу, где только 1 участник?
     
Top