import base64 import json from datetime import datetime from typing import Literal, Optional import aiofiles from agents import function_tool, RunContextWrapper from openai import AsyncOpenAI from apscheduler.schedulers.asyncio import AsyncIOScheduler from redis_service.connect import redis from database.repositories.user import UserRepository from bot.utils.executed_tasks import execute_task @function_tool async def image_gen_tool(wrapper: RunContextWrapper, prompt: str) -> str: """The function generates an image at the user's request. A prompt must be provided to generate the image. Args: prompt: Prompt for image generation. """ client: AsyncOpenAI = wrapper.context[0] img = await client.images.generate( model="gpt-image-1", prompt=prompt, n=1, size="1024x1024" ) image_base64 = img.data[0].b64_json image_bytes = base64.b64decode(image_base64) async with aiofiles.open(f"images/image_{wrapper.context[1]}.png", "wb") as f: await f.write(image_bytes) data = {'image': f"images/image_{wrapper.context[1]}.png", 'input_tokens': img.usage.input_tokens, 'output_tokens': img.usage.output_tokens} await redis.set(f'image_{wrapper.context[1]}', json.dumps(data)) return 'The image is generated' @function_tool async def create_task_tool( ctx: RunContextWrapper, description: str, agent_message: str, schedule_type: Literal["once", "daily", "interval"], time_str: Optional[str] = None, date_str: Optional[str] = None, interval_minutes: Optional[int] = None ) -> str: """Creates a new task in scheduler. Args: description: Task description from user agent_message: Message to send to main agent when executing for answer to question schedule_type: Schedule type (once, daily, interval) time_str: Time in HH:MM format for daily schedule date_str: Date in YYYY-MM-DD format for once schedule interval_minutes: Interval in minutes for interval schedule Returns: Message about task creation result """ if schedule_type == "once" and not date_str: return "Error: date must be specified for one-time task" if schedule_type == "daily" and not time_str: return "Error: time must be specified for daily task" if schedule_type == "interval" and not interval_minutes: return "Error: interval in minutes must be specified for interval task" user_repo: UserRepository = ctx.context[2] scheduler: AsyncIOScheduler = ctx.context[3] task_id = await user_repo.add_task(user_id=ctx.context[1], description=description, agent_message=agent_message, schedule_type=schedule_type, time_str=time_str, date_str=date_str, interval_minutes=interval_minutes) if schedule_type == "once": if time_str: task_datetime = datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M") else: task_datetime = datetime.strptime(f"{date_str} 12:00", "%Y-%m-%d %H:%M") scheduler.add_job( execute_task, 'date', run_date=task_datetime, args=[ctx.context[1], task_id], id=f'{ctx.context[1]}_{task_id}' ) elif schedule_type == "daily": task_time = datetime.strptime(time_str, "%H:%M").time() scheduler.add_job( execute_task, 'cron', hour=task_time.hour, minute=task_time.minute, args=[ctx.context[1], task_id], id=f'{ctx.context[1]}_{task_id}' ) elif schedule_type == "interval": scheduler.add_job( execute_task, 'interval', minutes=interval_minutes, args=[ctx.context[1], task_id], id=f'{ctx.context[1]}_{task_id}' ) return f"✅ Task successfully created!\nID: {task_id}\nDescription: {description}\nSchedule: {schedule_type}" @function_tool async def list_tasks_tool( ctx: RunContextWrapper, ) -> str: """Gets list of user tasks. Args: Returns: List of tasks in text format """ user_repo: UserRepository = ctx.context[2] tasks = await user_repo.get_all_tasks(user_id=ctx.context[1]) text_tasks = '\n'.join([f"Task ID[{task.id}]: {task.description}, {task.schedule_type}, " f"{'active' if task.is_active else 'inactive'}, {task.time_str or task.date_str or task.interval_minutes}" for task in tasks]) return text_tasks @function_tool async def update_task_tool( ctx: RunContextWrapper, task_id: int, description: Optional[str] = None, agent_message: Optional[str] = None, schedule_type: Optional[Literal["once", "daily", "interval"]] = None, time_str: Optional[str] = None, date_str: Optional[str] = None, interval_minutes: Optional[int] = None, is_active: Optional[bool] = None ) -> str: """Updates existing task. Args: task_id: Task ID to update description: New task description agent_message: New agent message schedule_type: New schedule type time_str: New time in HH:MM format date_str: New date in YYYY-MM-DD format interval_minutes: New interval in minutes is_active: New activity status Returns: Message about update result """ user_repo: UserRepository = ctx.context[2] scheduler: AsyncIOScheduler = ctx.context[3] task = await user_repo.get_task(ctx.context[1], task_id) if not task: return '❌ Task not found' schedule_type = schedule_type or task.schedule_type description = description or task.description agent_message = agent_message or task.agent_message time_str = time_str or task.time_str date_str = date_str or task.date_str interval_minutes = interval_minutes or task.interval_minutes is_active = is_active or task.is_active await user_repo.update_task(ctx.context[1], task_id, description=description, agent_message=agent_message, is_active=is_active, schedule_type=schedule_type, time_str=time_str, date_str=date_str, interval_minutes=interval_minutes) try: scheduler.remove_job(f'{ctx.context[1]}_{task_id}') except: pass if schedule_type == "once": if time_str: task_datetime = datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M") else: task_datetime = datetime.strptime(f"{date_str} 12:00", "%Y-%m-%d %H:%M") scheduler.add_job( execute_task, 'date', run_date=task_datetime, args=[ctx.context[1], task_id], id=f'{ctx.context[1]}_{task_id}' ) elif schedule_type == "daily": task_time = datetime.strptime(time_str, "%H:%M").time() scheduler.add_job( execute_task, 'cron', hour=task_time.hour, minute=task_time.minute, args=[ctx.context[1], task_id], id=f'{ctx.context[1]}_{task_id}' ) elif schedule_type == "interval": scheduler.add_job( execute_task, 'interval', minutes=interval_minutes, args=[ctx.context[1], task_id], id=f'{ctx.context[1]}_{task_id}' ) @function_tool async def delete_task_tool( ctx: RunContextWrapper, task_id: int ) -> str: """Deletes task from scheduler. Args: task_id: Task ID to delete Returns: Message about deletion result """ user_repo: UserRepository = ctx.context[2] scheduler: AsyncIOScheduler = ctx.context[3] await user_repo.delete_task(ctx.context[1], task_id) try: scheduler.remove_job(f'{ctx.context[1]}_{task_id}') except: pass return '✅ Task successfully deleted' @function_tool async def get_task_details_tool( ctx: RunContextWrapper, task_id: int ) -> str: """Gets detailed task information. Args: task_id: Task ID Returns: Detailed task information """ user_repo: UserRepository = ctx.context[2] task = await user_repo.get_task(ctx.context[1], task_id) if not task: return '❌ Task not found' return (f'📋 Task Details\n\n' f'ID: `{task.id}`\n' f'Description: {task.description}\n' f'Agent Message: {task.agent_message}\n' f'Schedule Type: {task.schedule_type}\n' f'Status: {"active" if task.is_active else "inactive"}' f'{"Interval" if task.schedule_type == "interval" else "Date"}: {task.time_str or task.date_str or task.interval_minutes}\n')