import base64, json, uuid
import os
from io import BytesIO
from typing import Optional
import aiofiles
from agents.mcp import MCPServerStdio
from aiogram import Bot
from aiogram.types import BufferedInputFile
from redis.asyncio.client import Redis
from agents import Runner, RunConfig
from dataclasses import dataclass
from bot.agents_tools.agents_ import client, create_main_agent, memory_creator_agent
from database.models import User
from database.repositories.user import UserRepository
from database.repositories.utils import UtilsRepository
from config import ADMIN_ID
@dataclass
class AnswerText:
answer: str
image_bytes: Optional[bytes]
input_tokens: int
input_tokens_image: int
output_tokens: int
output_tokens_image: int
@dataclass
class AnswerImage:
answer: str
input_tokens: int
output_tokens: int
image_path: str
async def return_vectors(user_id: int, user_repo: UserRepository, utils_repo: UtilsRepository):
memory_vector = await user_repo.get_memory_vector(user_id=user_id)
if not memory_vector:
vector_store = await client.vector_stores.create(name=f"user_memory_{user_id}")
await user_repo.add_memory_vector(user_id=user_id, vector_store_id=vector_store.id)
vector_store_id = vector_store.id
else:
vector_store_id = memory_vector.id_vector
knowledge_vector = await utils_repo.get_knowledge_vectore_store_id()
if not knowledge_vector:
vector_store = await client.vector_stores.create(name="knowledge_base")
await utils_repo.add_knowledge_vectore_store_id(vector_store.id)
knowledge_id = vector_store.id
else:
knowledge_id = knowledge_vector.id_vector
return vector_store_id, knowledge_id
async def encode_image(image_path):
async with aiofiles.open(image_path, "rb") as image_file:
return base64.b64encode(await image_file.read()).decode("utf-8")
async def text_request(text: str, user: User, user_repo: UserRepository, utils_repo: UtilsRepository,
redis: Redis, mcp_server_1: MCPServerStdio, bot: Bot):
vector_store_id, knowledge_id = await return_vectors(user_id=user.telegram_id, user_repo=user_repo, utils_repo=utils_repo)
messages = await user_repo.get_messags(user_id=user.telegram_id)
user_wallet = await user_repo.get_wallet(user_id=user.telegram_id)
runner = await Runner.run(
starting_agent=await create_main_agent(user_memory_id=vector_store_id, knowledge_id=knowledge_id,
mcp_server_1=mcp_server_1, user_id=user.telegram_id,
private_key=user_wallet),
input=[{'role': message.role,
'content': message.content if f'image_{user.telegram_id}' not in message.content
else [{"type": "input_text", "text": message.content.split('|')[-1]},
{
"type": "input_image",
"image_url": f"data:image/jpeg;base64,{await encode_image(message.content.split('|')[0])}",
}]}
for message in messages] + [{'role': 'user', 'content': text}],
context=(client, user.telegram_id),
run_config=RunConfig(
tracing_disabled=False
)
)
input_tokens = 0
output_tokens = 0
for response in runner.raw_responses:
input_tokens += response.usage.input_tokens
output_tokens += response.usage.output_tokens
# await send_raw_response(bot, str(runner.raw_responses))
answer = runner.final_output
is_image_answer = await redis.get(f'image_{user.telegram_id}')
if is_image_answer:
image_answer = json.loads(is_image_answer)
await redis.delete(f'image_{user.telegram_id}')
image_path = image_answer['image']
input_tokens_image = image_answer['input_tokens']
output_tokens_image = image_answer['output_tokens']
# await bot.send_message(chat_id=ADMIN_ID, text=f"Image Request\n\n"
# f"Input tokens: {input_tokens_image}\n"
# f"Output tokens: {output_tokens_image}\n")
async with aiofiles.open(image_path, "rb") as image_file:
image_bytes = await image_file.read()
os.remove(image_path)
return AnswerText(answer=answer, image_bytes=image_bytes, input_tokens=input_tokens,
input_tokens_image=input_tokens_image, output_tokens=output_tokens, output_tokens_image=output_tokens_image)
return AnswerText(answer=answer, image_bytes=None, input_tokens=input_tokens,
input_tokens_image=0, output_tokens=output_tokens, output_tokens_image=0)
async def image_request(image_bytes: bytes, user: User, user_repo: UserRepository,
utils_repo: UtilsRepository, redis: Redis, mcp_server_1: MCPServerStdio, bot: Bot,
caption: str = None):
vector_store_id, knowledge_id = await return_vectors(user_id=user.telegram_id, user_repo=user_repo, utils_repo=utils_repo)
messages = await user_repo.get_messags(user_id=user.telegram_id)
user_wallet = await user_repo.get_wallet(user_id=user.telegram_id)
id_image = uuid.uuid4()
async with aiofiles.open(f"images/image_{user.telegram_id}_{id_image}.jpeg", "wb") as image_file:
await image_file.write(image_bytes)
runner = await Runner.run(
starting_agent=await create_main_agent(user_memory_id=vector_store_id, knowledge_id=knowledge_id,
mcp_server_1=mcp_server_1, user_id=user.telegram_id,
private_key=user_wallet),
input=[{'role': message.role,
'content': message.content if f'image_{user.telegram_id}' not in message.content
else [{"type": "input_text", "text": message.content.split('|')[-1]},
{
"type": "input_image",
"image_url": f"data:image/jpeg;base64,{await encode_image(message.content.split('|')[0])}",
}]}
for message in messages] + [{'role': 'user', 'content': [{"type": "input_text",
"text": f"{caption if caption else '.'}"},
{
"type": "input_image",
"image_url": f"data:image/jpeg;base64,{base64.b64encode(image_bytes).decode('utf-8')}",
}]}],
context=(client, user.telegram_id),
run_config=RunConfig(
tracing_disabled=False
)
)
# await send_raw_response(bot, str(runner.raw_responses))
input_tokens = 0
output_tokens = 0
for response in runner.raw_responses:
input_tokens += response.usage.input_tokens
output_tokens += response.usage.output_tokens
answer = runner.final_output
return AnswerImage(answer=answer, input_tokens=input_tokens,
output_tokens=output_tokens, image_path=f'images/image_{user.telegram_id}_{id_image}.jpeg')
async def send_raw_response(bot: Bot, raw_response: str):
bio = BytesIO()
bio.write(raw_response.encode("utf-8"))
bio.seek(0)
await bot.send_document(
chat_id=ADMIN_ID,
document=BufferedInputFile(bio.read(), filename='raw_response.txt')
)
bio.close()