~cytrogen/evi-run

ref: 31083a9e249db6ab007dfd8515d241a67f514219 evi-run/bot/utils/check_payment.py -rw-r--r-- 2.4 KiB
31083a9e — Bendy update README.md 5 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import asyncio
import json, os
from decimal import getcontext, Decimal

from dotenv import load_dotenv
from pytonapi import AsyncTonapi
from solana.rpc.async_api import AsyncClient
from solana.exceptions import SolanaRpcException
from solana.rpc.types import Pubkey
from spl.token.instructions import get_associated_token_address


load_dotenv()

tonapi = AsyncTonapi(api_key=os.getenv('API_KEY_TON'))


async def check_payment_ton(amount: str):
    getcontext().prec = 18
    your_amount_dec = Decimal(amount)
    your_amount_nano = int((your_amount_dec * Decimal(10 ** 9)).to_integral_value())

    transactions = await tonapi.accounts.get_events(account_id=os.getenv('TON_ADDRESS'), limit=15)
    for tx in transactions.events:
        if tx.actions[0].TonTransfer is None:
            continue
        if tx.actions[0].TonTransfer.amount == your_amount_nano:
            return True


async def check_payment_sol(amount: str, client: AsyncClient):
    ata = get_associated_token_address(mint=Pubkey.from_string(os.getenv('MINT_TOKEN_ADDRESS')), owner=Pubkey.from_string(os.getenv('ADDRESS_SOL')))

    getcontext().prec = 18
    your_amount_dec = Decimal(amount)

    bal_info = await client.get_token_account_balance(ata, commitment="confirmed")
    decimals = bal_info.value.decimals
    your_amount_nano = int((your_amount_dec * Decimal(10 ** decimals)).to_integral_value())

    sigs = await client.get_signatures_for_address(ata, limit=10)
    for sig in sigs.value:
        await asyncio.sleep(0.5)
        while True:
            try:
                transaction = await client.get_transaction(sig.signature, encoding="jsonParsed",
                                                           max_supported_transaction_version=0)
                instructions = transaction.value.transaction.transaction.message.instructions
                for index, instr in enumerate(instructions):
                    data_instr = json.loads(instr.to_json())
                    if data_instr.get("program") != "spl-token":
                        continue
                    if data_instr['parsed']['info']['destination'] == str(ata) and \
                            data_instr['parsed']['info']['tokenAmount']['amount'] == str(your_amount_nano):
                        return True
                break
            except SolanaRpcException as e:
                await asyncio.sleep(5)
            except Exception as e:
                return False
    return False