Всем привет Я уже 3 дня сижу мучаюсь над одним кодом и одной ошибкой Все используемые библиотеки: aiogram 3.0 beem Задача такова: Это бот, который отслеживает все транзакции на кошельке HIVE. Когда бот фиксирует транзакцию с криптовалютой HIVE, он отправляет информацию об этой транзакции всем пользователям бота. Сообщение, которое бот отправляет пользователю, содержит данные о транзакции и включает кнопку Отправить. Если пользователь нажимает эту кнопку, бот автоматически отправляет средства обратно на кошелёк отправителя. Например: Пользователь1 отправляет криптовалюту HIVE на кошелёк администратора. Бот фиксирует транзакцию и отправляет сообщение с её деталями пользователям. Если администратор нажимает кнопку "Отправить" в сообщении, бот возвращает криптовалюту обратно на кошелёк Пользователя1. Вот весь код который у меня сейчас есть import logging import structlog from aiogram.types import Message from beem import Hive from beem.blockchain import Blockchain from beem.account import Account from aiogram import Bot, Dispatcher from aiogram.filters import Command from asyncio import Queue from concurrent.futures import ThreadPoolExecutor import asyncio # Логирование logging.basicConfig(level=logging.DEBUG) # Установлен уровень DEBUG для подробного логирования structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer() ], logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, ) logger = structlog.get_logger() # Конфигурация HIVE_ACCOUNT = ("" "") HIVE_POSTING_KEY = "" ADMIN_CHAT_ID = "" TELEGRAM_BOT_TOKEN = "" # Проверяем доступность узлов Hive try: hive = Hive(node=["https://api.hive.blog", "https://anyx.io", "https://rpc.ecency.com"]) logger.debug("Hive instance создан успешно.") blockchain_info = hive.info() logger.info(f"Доступ к блокчейну Hive есть: {blockchain_info}") except Exception as e: logger.error(f"Ошибка подключения к Hive: {e}") exit(1) # Проверяем доступ к аккаунту try: account = Account(HIVE_ACCOUNT, blockchain_instance=hive) logger.info(f"Успешное подключение к аккаунту Hive: {account}") except Exception as e: logger.error(f"Ошибка подключения к аккаунту Hive: {e}") exit(1) async def simulate_transaction(queue): # Пример искусственной транзакции fake_transaction = { "type": "transfer", "data": { "from": "test_sender", "amount": "10.000 HIVE", "memo": "Тестовая заметка" } } # Логируем и добавляем тестовую транзакцию в очередь logger.info("Симуляция тестовой транзакции...") await queue.put(fake_transaction) # Обработчик команды /start async def welcome(message: Message): user_name = message.from_user.first_name logger.info(f"Приветственное сообщение отправлено пользователю {user_name}") await message.reply(f"Привет, {user_name}! Я отслеживаю транзакции на Hive.") def hive_stream_sync(blockchain, known_transactions, queue): logger.info("Запуск синхронного стриминга Hive...") for block in blockchain.stream(): block_num = block.get('block_num') logger.info(f"Получен блок: {block_num}") logger.debug(f"Содержимое блока: {block}") # Лог всего блока для подтверждения transactions = block.get("transactions", []) for tx in transactions: transaction_id = tx.get("transaction_id") logger.debug(f"Обработка транзакции: {transaction_id}") # Лог начала обработки транзакции if transaction_id in known_transactions: logger.debug(f"Транзакция {transaction_id} уже известна, пропускаю.") continue known_transactions.add(transaction_id) logger.debug( f"Добавлено в known_transactions, всего известных транзакций: {len(known_transactions)}") # Подтверждение добавления for op in tx.get("operations", []): if len(op) < 2: logger.warning("Некорректная операция в транзакции.") continue op_type, op_data = op logger.debug(f"Обработка операции: тип={op_type}, данные={op_data}") # Лог операции if op_type in ["transfer", "vote", "comment"]: queue.put_nowait({"type": op_type, "data": op_data}) logger.info(f"Операция добавлена в очередь: {op_type}") # Обработчик мониторинга транзакций async def monitor_hive_transactions(queue: Queue, known_transactions: set): hive = Hive(node=["https://api.hive.blog", "https://anyx.io", "https://rpc.ecency.com"]) blockchain = Blockchain(blockchain_instance=hive) loop = asyncio.get_event_loop() with ThreadPoolExecutor() as executor: await loop.run_in_executor(executor, hive_stream_sync, blockchain, known_transactions, queue) # Обработчик транзакций async def hive_transaction_handler(queue: Queue, bot: Bot): while True: transaction = await queue.get() tx_type = transaction["type"] data = transaction["data"] logger.debug(f"Получена транзакция из очереди: {tx_type} с данными: {data}") try: if tx_type == "transfer": sender = data.get("from", "(неизвестно)") amount = data.get("amount", "(неизвестно)") memo = data.get("memo", "(нет заметки)") logger.info(f"Обработка перевода: отправитель={sender}, сумма={amount}, заметка={memo}") await bot.send_message( chat_id=ADMIN_CHAT_ID, text=(f" *Новый перевод!*\n\n" f"Сумма: `{amount}`\n" f"Отправитель: {sender}\n" f"Заметка: `{memo}`"), parse_mode="Markdown", ) elif tx_type == "vote": voter = data.get("voter", "(неизвестно)") author = data.get("author", "(неизвестно)") permlink = data.get("permlink", "(нет ссылки)") weight = data.get("weight", 0) / 100 logger.info(f"Обработка голоса: голосующий={voter}, автор={author}, ссылка={permlink}, вес={weight}%") await bot.send_message( chat_id=ADMIN_CHAT_ID, text=(f" *Новый голос!*\n\n" f"Голосующий: {voter}\n" f"Автор: {author}\n" f"Пост: `{permlink}`\n" f"Вес: `{weight}%`"), parse_mode="Markdown", ) elif tx_type == "comment": author = data.get("author", "(неизвестно)") permlink = data.get("permlink", "(нет ссылки)") title = data.get("title", "(без заголовка)") logger.info(f"Обработка комментария: автор={author}, ссылка={permlink}, заголовок={title}") await bot.send_message( chat_id=ADMIN_CHAT_ID, text=(f" *Новый комментарий!*\n\n" f"Автор: {author}\n" f"Заголовок: `{title}`\n" f"Ссылка: `{permlink}`"), parse_mode="Markdown", ) except Exception as e: logger.error(f"Ошибка обработки транзакции {tx_type}: {e}") async def start_monitoring(queue: Queue, bot: Bot): known_transactions = set() logger.info("Запуск мониторинга транзакций Hive...") # Запускаем симуляцию тестовой транзакции await simulate_transaction(queue) # Собираем обработчики await asyncio.gather( hive_transaction_handler(queue, bot), monitor_hive_transactions(queue, known_transactions), ) # Инициализация бота async def create_bot_and_dispatcher(): bot = Bot(token=TELEGRAM_BOT_TOKEN) dp = Dispatcher() dp.message.register(welcome, Command("start")) return bot, dp # Обработчик запуска async def on_startup(bot, queue): logger.info("Бот успешно запущен.") await start_monitoring(queue, bot) # Основной метод async def main(): bot, dp = await create_bot_and_dispatcher() queue = Queue() try: logger.info("Запуск long polling...") await dp.start_polling(bot, on_startup=lambda: on_startup(bot, queue)) except Exception as e: logger.error(f"Критическая ошибка: {e}") finally: await bot.session.close() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Бот остановлен вручную.") Python import logging import structlog from aiogram.types import Message from beem import Hive from beem.blockchain import Blockchain from beem.account import Account from aiogram import Bot, Dispatcher from aiogram.filters import Command from asyncio import Queue from concurrent.futures import ThreadPoolExecutor import asyncio # Логирование logging.basicConfig(level=logging.DEBUG) # Установлен уровень DEBUG для подробного логирования structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.JSONRenderer() ], logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, ) logger = structlog.get_logger() # Конфигурация HIVE_ACCOUNT = ("" "") HIVE_POSTING_KEY = "" ADMIN_CHAT_ID = "" TELEGRAM_BOT_TOKEN = "" # Проверяем доступность узлов Hive try: hive = Hive(node=["https://api.hive.blog", "https://anyx.io", "https://rpc.ecency.com"]) logger.debug("Hive instance создан успешно.") blockchain_info = hive.info() logger.info(f"Доступ к блокчейну Hive есть: {blockchain_info}") except Exception as e: logger.error(f"Ошибка подключения к Hive: {e}") exit(1) # Проверяем доступ к аккаунту try: account = Account(HIVE_ACCOUNT, blockchain_instance=hive) logger.info(f"Успешное подключение к аккаунту Hive: {account}") except Exception as e: logger.error(f"Ошибка подключения к аккаунту Hive: {e}") exit(1) async def simulate_transaction(queue): # Пример искусственной транзакции fake_transaction = { "type": "transfer", "data": { "from": "test_sender", "amount": "10.000 HIVE", "memo": "Тестовая заметка" } } # Логируем и добавляем тестовую транзакцию в очередь logger.info("Симуляция тестовой транзакции...") await queue.put(fake_transaction) # Обработчик команды /start async def welcome(message: Message): user_name = message.from_user.first_name logger.info(f"Приветственное сообщение отправлено пользователю {user_name}") await message.reply(f"Привет, {user_name}! Я отслеживаю транзакции на Hive.") def hive_stream_sync(blockchain, known_transactions, queue): logger.info("Запуск синхронного стриминга Hive...") for block in blockchain.stream(): block_num = block.get('block_num') logger.info(f"Получен блок: {block_num}") logger.debug(f"Содержимое блока: {block}") # Лог всего блока для подтверждения transactions = block.get("transactions", []) for tx in transactions: transaction_id = tx.get("transaction_id") logger.debug(f"Обработка транзакции: {transaction_id}") # Лог начала обработки транзакции if transaction_id in known_transactions: logger.debug(f"Транзакция {transaction_id} уже известна, пропускаю.") continue known_transactions.add(transaction_id) logger.debug( f"Добавлено в known_transactions, всего известных транзакций: {len(known_transactions)}") # Подтверждение добавления for op in tx.get("operations", []): if len(op) < 2: logger.warning("Некорректная операция в транзакции.") continue op_type, op_data = op logger.debug(f"Обработка операции: тип={op_type}, данные={op_data}") # Лог операции if op_type in ["transfer", "vote", "comment"]: queue.put_nowait({"type": op_type, "data": op_data}) logger.info(f"Операция добавлена в очередь: {op_type}") # Обработчик мониторинга транзакций async def monitor_hive_transactions(queue: Queue, known_transactions: set): hive = Hive(node=["https://api.hive.blog", "https://anyx.io", "https://rpc.ecency.com"]) blockchain = Blockchain(blockchain_instance=hive) loop = asyncio.get_event_loop() with ThreadPoolExecutor() as executor: await loop.run_in_executor(executor, hive_stream_sync, blockchain, known_transactions, queue) # Обработчик транзакций async def hive_transaction_handler(queue: Queue, bot: Bot): while True: transaction = await queue.get() tx_type = transaction["type"] data = transaction["data"] logger.debug(f"Получена транзакция из очереди: {tx_type} с данными: {data}") try: if tx_type == "transfer": sender = data.get("from", "(неизвестно)") amount = data.get("amount", "(неизвестно)") memo = data.get("memo", "(нет заметки)") logger.info(f"Обработка перевода: отправитель={sender}, сумма={amount}, заметка={memo}") await bot.send_message( chat_id=ADMIN_CHAT_ID, text=(f" *Новый перевод!*\n\n" f"Сумма: `{amount}`\n" f"Отправитель: {sender}\n" f"Заметка: `{memo}`"), parse_mode="Markdown", ) elif tx_type == "vote": voter = data.get("voter", "(неизвестно)") author = data.get("author", "(неизвестно)") permlink = data.get("permlink", "(нет ссылки)") weight = data.get("weight", 0) / 100 logger.info(f"Обработка голоса: голосующий={voter}, автор={author}, ссылка={permlink}, вес={weight}%") await bot.send_message( chat_id=ADMIN_CHAT_ID, text=(f" *Новый голос!*\n\n" f"Голосующий: {voter}\n" f"Автор: {author}\n" f"Пост: `{permlink}`\n" f"Вес: `{weight}%`"), parse_mode="Markdown", ) elif tx_type == "comment": author = data.get("author", "(неизвестно)") permlink = data.get("permlink", "(нет ссылки)") title = data.get("title", "(без заголовка)") logger.info(f"Обработка комментария: автор={author}, ссылка={permlink}, заголовок={title}") await bot.send_message( chat_id=ADMIN_CHAT_ID, text=(f" *Новый комментарий!*\n\n" f"Автор: {author}\n" f"Заголовок: `{title}`\n" f"Ссылка: `{permlink}`"), parse_mode="Markdown", ) except Exception as e: logger.error(f"Ошибка обработки транзакции {tx_type}: {e}") async def start_monitoring(queue: Queue, bot: Bot): known_transactions = set() logger.info("Запуск мониторинга транзакций Hive...") # Запускаем симуляцию тестовой транзакции await simulate_transaction(queue) # Собираем обработчики await asyncio.gather( hive_transaction_handler(queue, bot), monitor_hive_transactions(queue, known_transactions), ) # Инициализация бота async def create_bot_and_dispatcher(): bot = Bot(token=TELEGRAM_BOT_TOKEN) dp = Dispatcher() dp.message.register(welcome, Command("start")) return bot, dp # Обработчик запуска async def on_startup(bot, queue): logger.info("Бот успешно запущен.") await start_monitoring(queue, bot) # Основной метод async def main(): bot, dp = await create_bot_and_dispatcher() queue = Queue() try: logger.info("Запуск long polling...") await dp.start_polling(bot, on_startup=lambda: on_startup(bot, queue)) except Exception as e: logger.error(f"Критическая ошибка: {e}") finally: await bot.session.close() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Бот остановлен вручную.") (Да код написан частично с помощью CHATGPT, потому-что я думал что он поможет мне пофиксить ошибку) Забыл указать что ошибка заключается в том что бот не отслеживает транзакции по кошельку