import asyncio
import json, os
from decimal import getcontext, Decimal
from dotenv import load_dotenv
from pytonapi import AsyncTonapi
from solana.rpc.async_api import AsyncClient
from solana.exceptions import SolanaRpcException
from solana.rpc.types import Pubkey
from spl.token.instructions import get_associated_token_address
load_dotenv()
tonapi = AsyncTonapi(api_key=os.getenv('API_KEY_TON'))
async def check_payment_ton(amount: str):
getcontext().prec = 18
your_amount_dec = Decimal(amount)
your_amount_nano = int((your_amount_dec * Decimal(10 ** 9)).to_integral_value())
transactions = await tonapi.accounts.get_events(account_id=os.getenv('TON_ADDRESS'), limit=15)
for tx in transactions.events:
if tx.actions[0].TonTransfer is None:
continue
if tx.actions[0].TonTransfer.amount == your_amount_nano:
return True
async def check_payment_sol(amount: str, client: AsyncClient):
ata = get_associated_token_address(mint=Pubkey.from_string(os.getenv('MINT_TOKEN_ADDRESS')), owner=Pubkey.from_string(os.getenv('ADDRESS_SOL')))
getcontext().prec = 18
your_amount_dec = Decimal(amount)
bal_info = await client.get_token_account_balance(ata, commitment="confirmed")
decimals = bal_info.value.decimals
your_amount_nano = int((your_amount_dec * Decimal(10 ** decimals)).to_integral_value())
sigs = await client.get_signatures_for_address(ata, limit=10)
for sig in sigs.value:
await asyncio.sleep(0.5)
while True:
try:
transaction = await client.get_transaction(sig.signature, encoding="jsonParsed",
max_supported_transaction_version=0)
instructions = transaction.value.transaction.transaction.message.instructions
for index, instr in enumerate(instructions):
data_instr = json.loads(instr.to_json())
if data_instr.get("program") != "spl-token":
continue
if data_instr['parsed']['info']['destination'] == str(ata) and \
data_instr['parsed']['info']['tokenAmount']['amount'] == str(your_amount_nano):
return True
break
except SolanaRpcException as e:
await asyncio.sleep(5)
except Exception as e:
return False
return False