import ast from aiogram.enums import ContentType from aiogram import F from aiogram_dialog.widgets.kbd import Button, Row, Group, Radio, ManagedRadio from aiogram_dialog import Dialog, Window, ChatEvent, DialogManager from aiogram_dialog.widgets.input import MessageInput from aiogram_dialog.widgets.text import Format from aiogram_dialog.widgets.kbd import Cancel from aiogram_dialog.widgets.kbd import SwitchTo from aiogram.types import CallbackQuery, Message from solders.keypair import Keypair from solana.rpc.types import Pubkey from fluentogram import TranslatorHub from bot.dialogs.i18n_widget import I18NFormat from bot.states.states import Wallet from bot.utils.solana_funcs import get_balances from database.repositories.user import UserRepository from config import AVAILABLE_LANGUAGES_WORDS, AVAILABLE_LANGUAGES def is_int_list(text): try: value = ast.literal_eval(text) if isinstance(value, list) and all(isinstance(x, int) for x in value): if len(value) != 0: return value return False except Exception: return False async def on_cancel_wallet(callback: ChatEvent, widget: Button, manager: DialogManager): state = manager.middleware_data.get('state') await state.clear() await callback.message.delete() await manager.done() async def on_input_key(message: Message, widget: MessageInput, manager: DialogManager): if not message.text: return await manager.switch_to(state=Wallet.add_not_format) bytes_key = is_int_list(message.text) if not bytes_key: return await manager.switch_to(state=Wallet.add_not_format) user_repo: UserRepository = manager.middleware_data['user_repo'] user = manager.middleware_data['user'] try: solana_client = manager.middleware_data['solana_client'] balances, address = await get_balances(client=solana_client, secret=bytes_key) manager.dialog_data['balance_sol'] = '\n'.join(balances) manager.dialog_data['wallet_address'] = address await manager.switch_to(state=Wallet.balance_after_check) state = manager.middleware_data.get('state') await user_repo.add_wallet_key(user.telegram_id, message.text) await state.clear() except Exception as e: print(e) return await manager.switch_to(state=Wallet.add_not_format) async def on_input_key_after_not_format(message: Message, widget: MessageInput, manager: DialogManager): if not message.text: return bytes_key = is_int_list(message.text) if not bytes_key: return user_repo: UserRepository = manager.middleware_data['user_repo'] user = manager.middleware_data['user'] try: solana_client = manager.middleware_data['solana_client'] balances, address = await get_balances(client=solana_client, secret=bytes_key) manager.dialog_data['balance_sol'] = '\n'.join(balances) manager.dialog_data['wallet_address'] = address await manager.switch_to(state=Wallet.balance_after_check) state = manager.middleware_data.get('state') await user_repo.add_wallet_key(user.telegram_id, message.text) await state.clear() except Exception as e: print(e) pass async def on_delete_approve(callback: ChatEvent, widget: Button, manager: DialogManager): user_repo: UserRepository = manager.middleware_data['user_repo'] i18n = manager.middleware_data.get('i18n') user = manager.middleware_data['user'] await user_repo.delete_wallet_key(user.telegram_id) await callback.answer(text=i18n.get('command_delete_key_approve_text'), show_alert=True) state = manager.middleware_data.get('state') await state.clear() await callback.message.delete() await manager.done() async def getter_main(dialog_manager: DialogManager, **kwargs): user_repo: UserRepository = dialog_manager.middleware_data['user_repo'] user = dialog_manager.middleware_data['user'] wallet = await user_repo.get_wallet(user.telegram_id) return {'is_wallet': True if wallet else False} async def getter_balance(dialog_manager: DialogManager, **kwargs): user_repo: UserRepository = dialog_manager.middleware_data['user_repo'] user = dialog_manager.middleware_data['user'] wallet = await user_repo.get_wallet(user.telegram_id) wallet = is_int_list(wallet) solana_client = dialog_manager.middleware_data['solana_client'] balances, address = await get_balances(client=solana_client, secret=wallet) dialog_manager.dialog_data['balance_sol'] = '\n'.join(balances) dialog_manager.dialog_data['wallet_address'] = address state = dialog_manager.middleware_data.get('state') await state.clear() return {'balance_sol': wallet} dialog = Dialog( Window( I18NFormat('cmd_wallet_text_start'), Group( SwitchTo( I18NFormat('wallet_balance_kb'), id='wallet_balance', state=Wallet.balance ), SwitchTo( I18NFormat('wallet_delete_key'), id='wallet_delete', state=Wallet.delete ), width=2, when=F['is_wallet'] ), Cancel(I18NFormat('close_kb'), id='cancel_wallet', on_click=on_cancel_wallet), MessageInput( content_types=[ContentType.ANY], func=on_input_key ), state=Wallet.main, getter=getter_main ), Window( I18NFormat('not_format_wallet_key'), MessageInput( content_types=[ContentType.ANY], func=on_input_key ), Cancel(I18NFormat('close_kb'), id='cancel_wallet', on_click=on_cancel_wallet), state=Wallet.add_not_format ), Window( I18NFormat('text_after_add_key') + Format(' {dialog_data[wallet_address]}'), Format('{dialog_data[balance_sol]}'), SwitchTo( I18NFormat('wallet_delete_key'), id='wallet_delete', state=Wallet.delete ), Cancel(I18NFormat('close_kb'), id='cancel_wallet', on_click=on_cancel_wallet), state=Wallet.balance_after_check ), Window( I18NFormat('wallet_delete_key_text'), Button( I18NFormat('command_new_approve_kb'), id='wallet_delete_approve', on_click=on_delete_approve ), Cancel(I18NFormat('close_kb'), id='cancel_wallet', on_click=on_cancel_wallet), state=Wallet.delete ), Window( I18NFormat('text_balance_wallet') + Format(' {dialog_data[wallet_address]}'), Format('{dialog_data[balance_sol]}'), SwitchTo( I18NFormat('wallet_delete_key'), id='wallet_delete', state=Wallet.delete ), Cancel(I18NFormat('close_kb'), id='cancel_wallet', on_click=on_cancel_wallet), state=Wallet.balance, getter=getter_balance ) )