import re from agents.mcp import MCPServerStdio from aiogram.types import Message, BufferedInputFile from chatgpt_md_converter import telegram_format from redis.asyncio.client import Redis from bot.utils.calculate_tokens import calculate_tokens from database.models import User from database.repositories.user import UserRepository from database.repositories.utils import UtilsRepository from bot.utils.agent_requests import AnswerText, text_request, AnswerImage, image_request import bot.keyboards.inline as inline_kb from config import TOKENS_LIMIT_FOR_WARNING_MESSAGE async def send_answer_text(user_ques: str, message: Message, answer: AnswerText, user: User, user_repo: UserRepository, i18n): if answer.image_bytes: await message.answer_photo(photo=BufferedInputFile(answer.image_bytes, filename=f"{user.telegram_id}.jpeg"), caption=answer.answer) await user_repo.add_context(user_id=user.telegram_id, role='user', content=user_ques) await user_repo.add_context(user_id=user.telegram_id, role='assistant', content=answer.answer) else: await user_repo.add_context(user_id=user.telegram_id, role='user', content=user_ques) row_id = await user_repo.add_context(user_id=user.telegram_id, role='assistant', content=answer.answer) messages = split_code_message(answer.answer) for index, mess in enumerate(messages, 1): if len(messages) == index: await message.answer(mess, reply_markup=inline_kb.keyboard_md(row_id=row_id, text=i18n.get('answer_md'))) else: await message.answer(mess) def split_code_message(text, type_: str = None): """ Reliably split Telegram HTML into chunks while preserving valid markup. - Self-closing tags are not pushed to the stack and therefore are not closed. - For opened tags we store the full opening form including attributes to re-open later. - Never split inside an HTML tag or inside an HTML entity. - Preserve Telegram-specific nuances such as
and/blocks. """ if not type_: text = telegram_format(text) text = text.replace('<blockquote expandable>', '') # Escape HTML comments so they are treated as text, # not as tags that could break the open/close stack while splitting comment_pattern = re.compile(r"", re.DOTALL) def _escape_comment(m): c = m.group(0) return c.replace('<', '<').replace('>', '>') text = comment_pattern.sub(_escape_comment, text) chunks = [] current_chunk = "" # Stack of opened tags: items are dicts {name, open} open_stack = [] position = 0 tag_pattern = re.compile(r"<(\/)?([a-zA-Z0-9\-]+)([^>]*)>") # Set of self-closing/non-closing tags in Telegram HTML context SELF_CLOSING = {"br"} def is_self_closing(tag_name: str, tag_full: str) -> bool: return tag_name in SELF_CLOSING or tag_full.strip().endswith('/>') def close_open_tags() -> str: # Close only normal opened tags in reverse order closing = [] for item in reversed(open_stack): closing.append(f"{item['name']}>") return "".join(closing) def reopen_tags() -> str: # Re-open saved opening tags (with attributes) in original order. # For blockquote expandable we keep the original form as-is. return "".join(item['open'] for item in open_stack) def escape_tag_text(tag_text: str) -> str: """Render a tag as plain text by escaping angle brackets.""" return tag_text.replace('<', '<').replace('>', '>') def safe_cut_index(text_: str, start: int, tentative_end: int) -> int: """Shift a split position so that we never cut inside a tag or an HTML entity.""" end = min(tentative_end, len(text_)) if end <= start: return end segment = text_[start:end] # 1) Do not split inside a tag: if the last '<' is after the last '>' -> move back to that '<' last_lt = segment.rfind('<') last_gt = segment.rfind('>') if last_lt != -1 and (last_gt == -1 or last_lt > last_gt): end = start + last_lt if end <= start: return start segment = text_[start:end] # 2) Do not split inside an entity: if there's '&' after the last ';' -> move back to that '&' last_amp = segment.rfind('&') last_semi = segment.rfind(';') if last_amp != -1 and (last_semi == -1 or last_amp > last_semi): end = start + last_amp return end text_len = len(text) while position < text_len: # Dynamic budget for the current chunk SAFETY = 64 BASE_LIMIT = 3900 allowed_total = BASE_LIMIT - len(close_open_tags()) - len(reopen_tags()) - SAFETY # Clamp to reasonable bounds just in case if allowed_total < 1000: allowed_total = 1000 elif allowed_total > BASE_LIMIT: allowed_total = BASE_LIMIT # If current chunk is full — close and start a new one if len(current_chunk) >= allowed_total: current_chunk += close_open_tags() chunks.append(current_chunk) current_chunk = reopen_tags() # Compute the boundary where we can safely write more characters tentative_end = position + (allowed_total - len(current_chunk)) if tentative_end <= position: # No room left — force a chunk break current_chunk += close_open_tags() chunks.append(current_chunk) current_chunk = reopen_tags() continue # Look for the next tag before the boundary next_match = tag_pattern.search(text, position, min(tentative_end, text_len)) if not next_match: # No tags before boundary — split at a safe position cut_idx = safe_cut_index(text, position, min(tentative_end, text_len)) if cut_idx == position: # No safe position found in the window — extend the window to find the next tag/entity end extend_end = min(position + 100 + (allowed_total - len(current_chunk)), text_len) next_match_ext = tag_pattern.search(text, position, extend_end) if next_match_ext: cut_idx = next_match_ext.start() else: # No complete tag found in lookahead — split before a partial '<...' extended_segment = text[position:extend_end] last_lt = extended_segment.rfind('<') if last_lt != -1: # Check if there's '>' after that '<' in the extended window gt_after = extended_segment.find('>', last_lt + 1) if gt_after == -1: # Tag is not completed within the window — cut before '<' cut_idx = position + last_lt else: cut_idx = extend_end else: cut_idx = extend_end # Zero-shift guard (when cut_idx == position): # happens if a partial tag starts exactly at 'position'. if cut_idx == position: if current_chunk: # Close current chunk and start a new one before continuing current_chunk += close_open_tags() chunks.append(current_chunk) current_chunk = reopen_tags() continue else: # Current chunk is empty — extend search forward to the next '>' and advance at least to it search_end = min(position + 300, text_len) gt_global = text.find('>', position, search_end) if gt_global != -1: cut_idx = gt_global + 1 else: # Last resort — move to search_end to avoid infinite loop cut_idx = search_end current_chunk += text[position:cut_idx] position = cut_idx continue # There is a tag before the boundary start_tag, end_tag = next_match.span() tag_full = next_match.group(0) is_closing = next_match.group(1) == "/" tag_name = next_match.group(2) _ = next_match.group(3) # If text before the tag doesn't fit — break the chunk if (start_tag - position) + len(current_chunk) > allowed_total: current_chunk += close_open_tags() chunks.append(current_chunk) current_chunk = reopen_tags() continue # Append text up to the tag current_chunk += text[position:start_tag] position = start_tag # Tag handling if is_closing: # Prefer strict LIFO, but outside pre/code try to fix nesting to preserve formatting if open_stack and open_stack[-1]['name'] == tag_name: # Does the tag itself fit into the current chunk? if len(current_chunk) + (end_tag - start_tag) > allowed_total: current_chunk += close_open_tags() chunks.append(current_chunk) current_chunk = reopen_tags() current_chunk += tag_full # Pop the top tag open_stack.pop() else: if open_stack and open_stack[-1]['name'] in {"pre", "code"}: # Inside pre/code escape foreign closing tags as text escaped = escape_tag_text(tag_full) if len(current_chunk) + len(escaped) > allowed_total: current_chunk += close_open_tags() chunks.append(current_chunk) current_chunk = reopen_tags() current_chunk += escaped else: # Outside pre/code: normalize nesting by auto-closing tags down to target. # Find the target tag in the stack (from the end). If not found — escape as text. target_idx = None for idx in range(len(open_stack) - 1, -1, -1): if open_stack[idx]['name'] == tag_name: target_idx = idx break if target_idx is None: escaped = escape_tag_text(tag_full) if len(current_chunk) + len(escaped) > allowed_total: current_chunk += close_open_tags() chunks.append(current_chunk) current_chunk = reopen_tags() current_chunk += escaped else: # Close all tags above the target sequentially names_above = [open_stack[i]['name'] for i in range(len(open_stack) - 1, target_idx, -1)] estimated = sum(len(f"{n}>") for n in names_above) + (end_tag - start_tag) if len(current_chunk) + estimated > allowed_total: # Start a new chunk before emitting the closing sequence to stay within budget current_chunk += close_open_tags() chunks.append(current_chunk) current_chunk = reopen_tags() # Emit the closing tags for the ones above the target for n in names_above: current_chunk += f"{n}>" open_stack.pop() # Finally append the original closing tag for the target and pop it current_chunk += tag_full open_stack.pop() # снимаем целевой тег else: # Opening tag # If we are inside pre/code and encounter a non pre/code tag — escape as text, do not push to stack if open_stack and open_stack[-1]['name'] in {"pre", "code"} and tag_name not in {"pre", "code"}: escaped_open = escape_tag_text(tag_full) if len(current_chunk) + len(escaped_open) > allowed_total: current_chunk += close_open_tags() chunks.append(current_chunk) current_chunk = reopen_tags() current_chunk += escaped_open else: if len(current_chunk) + (end_tag - start_tag) > allowed_total: current_chunk += close_open_tags() chunks.append(current_chunk) current_chunk = reopen_tags() current_chunk += tag_full # Do not push self-closing tags to the stack if not is_self_closing(tag_name, tag_full): # Save the original opening form with attributes. # Special case blockquote expandable — keep as-is. opening = tag_full open_stack.append({ 'name': tag_name, 'open': opening, }) position = end_tag # Finalization if current_chunk: current_chunk += close_open_tags() chunks.append(current_chunk) return chunks async def process_after_text(message: Message, user: User, user_repo: UserRepository, utils_repo: UtilsRepository, redis: Redis, i18n, mess_to_delete: Message, mcp_server_1: MCPServerStdio, scheduler, text_from_voice: str = None, constant_text: str = None): try: if text_from_voice: user_ques = text_from_voice elif constant_text: user_ques = constant_text else: user_ques = message.text answer = await text_request(text=user_ques, user=user, user_repo=user_repo, utils_repo=utils_repo, redis=redis, mcp_server_1=mcp_server_1, bot=message.bot, scheduler=scheduler) await send_answer_text(user_ques=user_ques, message=message, answer=answer, user=user, user_repo=user_repo, i18n=i18n) if answer.input_tokens + answer.output_tokens > TOKENS_LIMIT_FOR_WARNING_MESSAGE: await message.answer(i18n.get('warning_text_tokens')) await calculate_tokens(user=user, user_repo=user_repo, input_tokens_text=answer.input_tokens, input_tokens_img=answer.input_tokens_image, output_tokens_text=answer.output_tokens, output_tokens_img=answer.output_tokens_image) except Exception as e: print(e) await message.answer(text=i18n.get('warning_text_error')) finally: await redis.delete(f'request_{message.from_user.id}') await mess_to_delete.delete() async def send_answer_photo(message: Message, answer: AnswerImage, user: User, user_repo: UserRepository): caption = message.caption if message.caption else '.' await user_repo.add_context(user_id=user.telegram_id, role='user', content=f'{answer.image_path}|{caption}') await user_repo.add_context(user_id=user.telegram_id, role='assistant', content=answer.answer) messages = split_code_message(answer.answer) for index, mess in enumerate(messages, 1): await message.answer(mess) async def process_after_photo(message: Message, user: User, user_repo: UserRepository, utils_repo: UtilsRepository, redis: Redis, i18n, mess_to_delete: Message, mcp_server_1: MCPServerStdio, scheduler): try: file_id = message.photo[-1].file_id file_path = await message.bot.get_file(file_id=file_id) file_bytes = (await message.bot.download_file(file_path.file_path)).read() answer = await image_request(image_bytes=file_bytes, user=user, user_repo=user_repo, utils_repo=utils_repo, redis=redis, mcp_server_1=mcp_server_1, bot=message.bot, caption=message.caption, scheduler=scheduler) await send_answer_photo(message=message, answer=answer, user=user, user_repo=user_repo) if answer.input_tokens + answer.output_tokens > TOKENS_LIMIT_FOR_WARNING_MESSAGE: await message.answer(i18n.get('warning_text_tokens')) await calculate_tokens(user=user, user_repo=user_repo, input_tokens_text=answer.input_tokens, input_tokens_img=0, output_tokens_text=answer.output_tokens, output_tokens_img=0) except Exception as e: await message.answer(text=i18n.get('warning_text_error')) finally: await redis.delete(f'request_{message.from_user.id}') await mess_to_delete.delete()