~cytrogen/evi-run

ref: be8c8546459f748da84f2defda95e570368713a7 evi-run/bot/agents_tools/tools.py -rw-r--r-- 8.8 KiB
be8c8546 — Bendy feat(agent): inject runtime UTC context; fix date handling policies; adjust deep_knowledge routing 5 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import base64
import json
from datetime import datetime
from typing import Literal, Optional

import aiofiles
from agents import function_tool, RunContextWrapper
from openai import AsyncOpenAI
from apscheduler.schedulers.asyncio import AsyncIOScheduler


from redis_service.connect import redis
from database.repositories.user import UserRepository
from bot.utils.executed_tasks import execute_task


@function_tool
async def image_gen_tool(wrapper: RunContextWrapper, prompt: str) -> str:
    """The function generates an image at the user's request. A prompt must be provided to generate the image.

    Args:
        prompt: Prompt for image generation.
    """

    client: AsyncOpenAI = wrapper.context[0]

    img = await client.images.generate(
        model="gpt-image-1",
        prompt=prompt,
        n=1,
        size="1024x1024"
    )
    image_base64 = img.data[0].b64_json
    image_bytes = base64.b64decode(image_base64)

    async with aiofiles.open(f"images/image_{wrapper.context[1]}.png", "wb") as f:
        await f.write(image_bytes)

    data = {'image': f"images/image_{wrapper.context[1]}.png", 'input_tokens': img.usage.input_tokens, 'output_tokens': img.usage.output_tokens}

    await redis.set(f'image_{wrapper.context[1]}', json.dumps(data))

    return 'The image is generated'


@function_tool
async def create_task_tool(
        ctx: RunContextWrapper,
        description: str,
        agent_message: str,
        schedule_type: Literal["once", "daily", "interval"],
        time_str: Optional[str] = None,
        date_str: Optional[str] = None,
        interval_minutes: Optional[int] = None
) -> str:
    """Creates a new task in scheduler.

    Args:
        description: Task description from user
        agent_message: Message to send to main agent when executing for answer to question
        schedule_type: Schedule type (once, daily, interval)
        time_str: Time in HH:MM format for daily schedule
        date_str: Date in YYYY-MM-DD format for once schedule
        interval_minutes: Interval in minutes for interval schedule

    Returns:
        Message about task creation result
    """

    if schedule_type == "once" and not date_str:
        return "Error: date must be specified for one-time task"
    if schedule_type == "daily" and not time_str:
        return "Error: time must be specified for daily task"
    if schedule_type == "interval" and not interval_minutes:
        return "Error: interval in minutes must be specified for interval task"

    user_repo: UserRepository = ctx.context[2]
    scheduler: AsyncIOScheduler = ctx.context[3]

    task_id = await user_repo.add_task(user_id=ctx.context[1], description=description,
                                       agent_message=agent_message, schedule_type=schedule_type,
                                       time_str=time_str, date_str=date_str, interval_minutes=interval_minutes)

    if schedule_type == "once":
        if time_str:
            task_datetime = datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M")
        else:
            task_datetime = datetime.strptime(f"{date_str} 12:00", "%Y-%m-%d %H:%M")

        scheduler.add_job(
            execute_task,
            'date',
            run_date=task_datetime,
            args=[ctx.context[1], task_id],
            id=f'{ctx.context[1]}_{task_id}'
        )

    elif schedule_type == "daily":
        task_time = datetime.strptime(time_str, "%H:%M").time()

        scheduler.add_job(
            execute_task,
            'cron',
            hour=task_time.hour,
            minute=task_time.minute,
            args=[ctx.context[1], task_id],
            id=f'{ctx.context[1]}_{task_id}'
        )

    elif schedule_type == "interval":
        scheduler.add_job(
            execute_task,
            'interval',
            minutes=interval_minutes,
            args=[ctx.context[1], task_id],
            id=f'{ctx.context[1]}_{task_id}'
        )

    return f"✅ Task successfully created!\nID: {task_id}\nDescription: {description}\nSchedule: {schedule_type}"


@function_tool
async def list_tasks_tool(
        ctx: RunContextWrapper,
) -> str:
    """Gets list of user tasks.

    Args:

    Returns:
        List of tasks in text format
    """
    user_repo: UserRepository = ctx.context[2]
    tasks = await user_repo.get_all_tasks(user_id=ctx.context[1])

    text_tasks = '\n'.join([f"Task ID[{task.id}]: {task.description}, {task.schedule_type}, "
                            f"{'active' if task.is_active else 'inactive'}, {task.time_str or task.date_str or task.interval_minutes}"
                            for task in tasks])

    return text_tasks

@function_tool
async def update_task_tool(
        ctx: RunContextWrapper,
        task_id: int,
        description: Optional[str] = None,
        agent_message: Optional[str] = None,
        schedule_type: Optional[Literal["once", "daily", "interval"]] = None,
        time_str: Optional[str] = None,
        date_str: Optional[str] = None,
        interval_minutes: Optional[int] = None,
        is_active: Optional[bool] = None
) -> str:
    """Updates existing task.

    Args:
        task_id: Task ID to update
        description: New task description
        agent_message: New agent message
        schedule_type: New schedule type
        time_str: New time in HH:MM format
        date_str: New date in YYYY-MM-DD format
        interval_minutes: New interval in minutes
        is_active: New activity status

    Returns:
        Message about update result
    """
    user_repo: UserRepository = ctx.context[2]
    scheduler: AsyncIOScheduler = ctx.context[3]

    task = await user_repo.get_task(ctx.context[1], task_id)
    if not task:
        return '❌ Task not found'

    schedule_type = schedule_type or task.schedule_type
    description = description or task.description
    agent_message = agent_message or task.agent_message
    time_str = time_str or task.time_str
    date_str = date_str or task.date_str
    interval_minutes = interval_minutes or task.interval_minutes
    is_active = is_active or task.is_active

    await user_repo.update_task(ctx.context[1], task_id, description=description,
                                agent_message=agent_message, is_active=is_active,
                                schedule_type=schedule_type, time_str=time_str,
                                date_str=date_str, interval_minutes=interval_minutes)
    try:
        scheduler.remove_job(f'{ctx.context[1]}_{task_id}')
    except:
        pass

    if schedule_type == "once":
        if time_str:
            task_datetime = datetime.strptime(f"{date_str} {time_str}", "%Y-%m-%d %H:%M")
        else:
            task_datetime = datetime.strptime(f"{date_str} 12:00", "%Y-%m-%d %H:%M")

        scheduler.add_job(
            execute_task,
            'date',
            run_date=task_datetime,
            args=[ctx.context[1], task_id],
            id=f'{ctx.context[1]}_{task_id}'
        )

    elif schedule_type == "daily":
        task_time = datetime.strptime(time_str, "%H:%M").time()

        scheduler.add_job(
            execute_task,
            'cron',
            hour=task_time.hour,
            minute=task_time.minute,
            args=[ctx.context[1], task_id],
            id=f'{ctx.context[1]}_{task_id}'
        )

    elif schedule_type == "interval":
        scheduler.add_job(
            execute_task,
            'interval',
            minutes=interval_minutes,
            args=[ctx.context[1], task_id],
            id=f'{ctx.context[1]}_{task_id}'
        )


@function_tool
async def delete_task_tool(
        ctx: RunContextWrapper,
        task_id: int
) -> str:
    """Deletes task from scheduler.

    Args:
        task_id: Task ID to delete

    Returns:
        Message about deletion result
    """

    user_repo: UserRepository = ctx.context[2]
    scheduler: AsyncIOScheduler = ctx.context[3]
    await user_repo.delete_task(ctx.context[1], task_id)

    try:
        scheduler.remove_job(f'{ctx.context[1]}_{task_id}')
    except:
        pass

    return '✅ Task successfully deleted'



@function_tool
async def get_task_details_tool(
        ctx: RunContextWrapper,
        task_id: int
) -> str:
    """Gets detailed task information.

    Args:
        task_id: Task ID

    Returns:
        Detailed task information
    """

    user_repo: UserRepository = ctx.context[2]

    task = await user_repo.get_task(ctx.context[1], task_id)
    if not task:
        return '❌ Task not found'

    return (f'📋 Task Details\n\n' 
           f'ID: `{task.id}`\n' 
           f'Description: {task.description}\n' 
           f'Agent Message: {task.agent_message}\n'
           f'Schedule Type: {task.schedule_type}\n'
           f'Status: {"active" if task.is_active else "inactive"}' 
           f'{"Interval" if task.schedule_type == "interval" else "Date"}: {task.time_str or task.date_str or task.interval_minutes}\n')