diff --git a/tests/test_utils.py b/tests/test_utils.py index 3569020e..e6dd35d4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -11,7 +11,13 @@ from utils.alert import Alert, AlertSeverity, register_alert_hook, send_alert from utils.config import Config, ProtocolConfig -from utils.telegram import TelegramError, send_error_message, send_telegram_message +from utils.telegram import ( + TelegramError, + format_rich_table, + send_error_message, + send_telegram_message, + send_telegram_rich_message, +) from utils.web3_wrapper import ( MAX_BACKOFF_SECONDS, ProviderConnectionError, @@ -290,6 +296,126 @@ def test_send_telegram_message_test_override(self, mock_post): "[yearn_timelock] Test message", ) + def test_format_rich_table(self): + table = format_rich_table( + ["Market", "Utilization"], + [["ETH < WETH", "97.4%"], ["USDC", "95.1%"]], + caption="Aave & Morpho", + alignments=["left", "right"], + ) + + self.assertEqual( + table, + '' + '' + '' + '' + "
Aave & Morpho
MarketUtilization
ETH < WETH97.4%
USDC95.1%
", + ) + + def test_format_rich_table_rejects_mismatched_rows(self): + with self.assertRaises(ValueError): + format_rich_table(["Market", "Utilization"], [["ETH"]]) + + @patch("utils.telegram.requests.post") + def test_send_telegram_rich_message_success(self, mock_post): + mock_response = unittest.mock.Mock() + mock_response.status_code = 200 + mock_response.raise_for_status = unittest.mock.Mock() + mock_post.return_value = mock_response + + with patch.dict( + os.environ, + { + "TELEGRAM_BOT_TOKEN_TEST": "test_token", + "TELEGRAM_CHAT_ID_TEST": "test_chat_id", + "LOG_LEVEL": "INFO", + }, + ): + send_telegram_rich_message("

Test

", "test") + + url = mock_post.call_args[0][0] + payload = mock_post.call_args[1]["json"] + self.assertIn("sendRichMessage", url) + self.assertEqual(payload["chat_id"], "test_chat_id") + self.assertEqual(payload["rich_message"], {"html": "

Test

", "skip_entity_detection": True}) + self.assertNotIn("parse_mode", payload) + + @patch("utils.telegram.requests.post") + def test_send_telegram_rich_message_with_topic(self, mock_post): + mock_response = unittest.mock.Mock() + mock_response.status_code = 200 + mock_response.raise_for_status = unittest.mock.Mock() + mock_post.return_value = mock_response + + with patch.dict( + os.environ, + { + "TELEGRAM_BOT_TOKEN_DEFAULT": "default_token", + "TELEGRAM_CHAT_ID_TOPICS": "topics_chat_id", + "TELEGRAM_TOPIC_ID_AAVE": "42", + "LOG_LEVEL": "INFO", + }, + ): + send_telegram_rich_message("
ok
", "aave", disable_notification=True) + + url = mock_post.call_args[0][0] + payload = mock_post.call_args[1]["json"] + self.assertIn("default_token", url) + self.assertEqual(payload["chat_id"], "topics_chat_id") + self.assertEqual(payload["message_thread_id"], 42) + self.assertTrue(payload["disable_notification"]) + + @patch("utils.telegram.requests.post") + def test_send_telegram_rich_message_test_override_labels_protocol(self, mock_post): + mock_response = unittest.mock.Mock() + mock_response.status_code = 200 + mock_response.raise_for_status = unittest.mock.Mock() + mock_post.return_value = mock_response + + with patch.dict( + os.environ, + { + "TELEGRAM_TEST_CHAT_ID": "dummy_group", + "TELEGRAM_BOT_TOKEN_DEFAULT": "default_token", + "TELEGRAM_BOT_TOKEN_AAVE": "aave_token", + "TELEGRAM_CHAT_ID_TOPICS": "topics_chat_id", + "TELEGRAM_TOPIC_ID_AAVE": "42", + "LOG_LEVEL": "INFO", + }, + ): + send_telegram_rich_message("

Test

", "aave") + + url = mock_post.call_args[0][0] + payload = mock_post.call_args[1]["json"] + self.assertIn("default_token", url) + self.assertNotIn("aave_token", url) + self.assertEqual(payload["chat_id"], "dummy_group") + self.assertEqual(payload["rich_message"]["html"], "

[aave]

Test

") + self.assertNotIn("message_thread_id", payload) + + @patch("utils.telegram.requests.post") + def test_send_telegram_rich_message_falls_back_to_plain_text(self, mock_post): + failure = requests.RequestException("Connection error") + mock_response = unittest.mock.Mock() + mock_response.status_code = 200 + mock_response.raise_for_status = unittest.mock.Mock() + mock_post.side_effect = [failure, mock_response] + + with patch.dict( + os.environ, + { + "TELEGRAM_BOT_TOKEN_TEST": "test_token", + "TELEGRAM_CHAT_ID_TEST": "test_chat_id", + "LOG_LEVEL": "INFO", + }, + ): + send_telegram_rich_message("

Test

", "test", fallback_message="Fallback") + + fallback_payload = mock_post.call_args_list[1][1]["json"] + self.assertEqual(fallback_payload["text"], "Fallback") + self.assertNotIn("parse_mode", fallback_payload) + class TestSendErrorMessage(unittest.TestCase): """Tests for utils.telegram.send_error_message (dedicated errors channel).""" diff --git a/utils/telegram.py b/utils/telegram.py index 2348f3b4..b1dcf786 100644 --- a/utils/telegram.py +++ b/utils/telegram.py @@ -1,5 +1,7 @@ import os import re +from collections.abc import Iterable, Sequence +from html import escape as html_escape import requests from dotenv import load_dotenv @@ -13,6 +15,11 @@ # Maximum message length allowed by Telegram API MAX_MESSAGE_LENGTH = 4096 +# Telegram Bot API 10.1 rich messages allow substantially larger structured +# messages than sendMessage. Keep this explicit so rich-message callers can +# fail over before handing Telegram malformed/truncated HTML. +MAX_RICH_MESSAGE_LENGTH = 32768 + # Channel key for operational errors/diagnostics (GraphQL/fetch failures, retries, # crashes). Routed to a dedicated chat so transient noise doesn't spam the # per-protocol alert groups. Resolves via the same env-var scheme as any other @@ -43,12 +50,98 @@ def escape_markdown(text: str) -> str: return text +def escape_rich_html(text: object) -> str: + """Escape text for Telegram rich-message HTML.""" + return html_escape(str(text), quote=True) + + +def format_rich_table( + headers: Sequence[object], + rows: Iterable[Sequence[object]], + caption: object | None = None, + alignments: Sequence[str] | None = None, + bordered: bool = True, + striped: bool = True, +) -> str: + """Render a simple Telegram rich-message HTML table. + + The returned string is meant for ``send_telegram_rich_message``. Cell + content is escaped as plain text; callers that need inline rich formatting + should build the table HTML themselves. + """ + column_count = len(headers) + if column_count == 0: + raise ValueError("Telegram rich tables need at least one column") + if column_count > 20: + raise ValueError("Telegram rich tables support at most 20 columns") + + alignments = alignments or () + if len(alignments) > column_count: + raise ValueError("Table alignments cannot exceed the number of columns") + + normalized_alignments: list[str | None] = [] + for alignment in alignments: + if alignment not in {"left", "center", "right"}: + raise ValueError("Table alignment must be one of: left, center, right") + normalized_alignments.append(alignment) + normalized_alignments.extend([None] * (column_count - len(normalized_alignments))) + + table_rows = [tuple(row) for row in rows] + for row in table_rows: + if len(row) != column_count: + raise ValueError("All Telegram rich table rows must have the same number of columns as headers") + + attrs = [] + if bordered: + attrs.append("bordered") + if striped: + attrs.append("striped") + table_tag = "" + + html_parts = [table_tag] + if caption is not None: + html_parts.append(f"{escape_rich_html(caption)}") + + def _cell(tag: str, value: object, alignment: str | None) -> str: + align_attr = f' align="{alignment}"' if alignment else "" + return f"<{tag}{align_attr}>{escape_rich_html(value)}" + + html_parts.append("") + html_parts.extend(_cell("th", value, normalized_alignments[index]) for index, value in enumerate(headers)) + html_parts.append("") + + for row in table_rows: + html_parts.append("") + html_parts.extend(_cell("td", value, normalized_alignments[index]) for index, value in enumerate(row)) + html_parts.append("") + + html_parts.append("") + return "".join(html_parts) + + class TelegramError(Exception): """Exception raised for errors in Telegram API interactions.""" pass +def _telegram_destination(protocol: str) -> tuple[str | None, str | None, str | None]: + """Resolve protocol routing to bot token, chat id, and optional topic id.""" + topic_id = os.getenv(f"TELEGRAM_TOPIC_ID_{protocol.upper()}") + + if topic_id: + return ( + os.getenv("TELEGRAM_BOT_TOKEN_DEFAULT"), + os.getenv("TELEGRAM_CHAT_ID_TOPICS"), + topic_id, + ) + + bot_token = os.getenv(f"TELEGRAM_BOT_TOKEN_{protocol.upper()}") + if not bot_token: + bot_token = os.getenv("TELEGRAM_BOT_TOKEN_DEFAULT") + return bot_token, os.getenv(f"TELEGRAM_CHAT_ID_{protocol.upper()}"), None + + def _post_message( bot_token: str, chat_id: str, @@ -91,6 +184,42 @@ def _post_message( ) +def _post_rich_message( + bot_token: str, + chat_id: str, + rich_message: dict[str, object], + disable_notification: bool, + topic_id: str | None = None, +) -> None: + """Send a rich message to Telegram, raising TelegramError on failure.""" + url = f"https://api.telegram.org/bot{bot_token}/sendRichMessage" + payload: dict[str, object] = { + "chat_id": chat_id, + "rich_message": rich_message, + "disable_notification": disable_notification, + } + if topic_id: + payload["message_thread_id"] = int(topic_id) + + try: + response = requests.post(url, json=payload, timeout=10) + response.raise_for_status() + except requests.RequestException as e: + body = "" + err_response = getattr(e, "response", None) + if err_response is not None: + try: + body = f" body={err_response.text}" + except Exception: + pass + raise TelegramError(_redact_bot_token(f"Failed to send telegram rich message: {e}{body}")) + + if response.status_code != 200: + raise TelegramError( + _redact_bot_token(f"Failed to send telegram rich message: {response.status_code} - {response.text}") + ) + + def send_telegram_message( message: str, protocol: str, @@ -140,25 +269,65 @@ def send_telegram_message( _post_message(bot_token, test_chat_id, f"{label}{message}", plain_text, disable_notification) return - # Check if this protocol has a topic ID configured (forum-style group) - topic_id = os.getenv(f"TELEGRAM_TOPIC_ID_{protocol.upper()}") + bot_token, chat_id, topic_id = _telegram_destination(protocol) - if topic_id: - # Topics always use the default bot and the shared topics chat + if not bot_token or not chat_id: + logger.warning("Missing Telegram credentials for %s", protocol) + return + + _post_message(bot_token, chat_id, message, plain_text, disable_notification, topic_id) + + +def send_telegram_rich_message( + html: str, + protocol: str, + disable_notification: bool = False, + fallback_message: str | None = None, + skip_entity_detection: bool = True, +) -> None: + """Send Telegram Bot API 10.1 rich-message HTML. + + Use this for structured content like tables. If the Bot API rejects the rich + payload and ``fallback_message`` is provided, the fallback is sent as plain + text through the existing ``sendMessage`` path. + """ + logger.debug("Sending telegram rich message:\n%s", html) + + if os.getenv("LOG_LEVEL", "INFO").upper() == "DEBUG": + logger.debug("Skipping Telegram rich send (LOG_LEVEL=DEBUG)") + return + + if len(html) > MAX_RICH_MESSAGE_LENGTH: + if fallback_message is not None: + send_telegram_message(fallback_message, protocol, disable_notification, plain_text=True) + return + raise TelegramError(f"Telegram rich message exceeds {MAX_RICH_MESSAGE_LENGTH} characters") + + rich_message: dict[str, object] = {"html": html, "skip_entity_detection": skip_entity_detection} + + test_chat_id = os.getenv("TELEGRAM_TEST_CHAT_ID") + if test_chat_id: bot_token = os.getenv("TELEGRAM_BOT_TOKEN_DEFAULT") - chat_id = os.getenv("TELEGRAM_CHAT_ID_TOPICS") - else: - # Legacy per-protocol chat routing - bot_token = os.getenv(f"TELEGRAM_BOT_TOKEN_{protocol.upper()}") if not bot_token: - bot_token = os.getenv("TELEGRAM_BOT_TOKEN_DEFAULT") - chat_id = os.getenv(f"TELEGRAM_CHAT_ID_{protocol.upper()}") + logger.warning("TELEGRAM_TEST_CHAT_ID set but TELEGRAM_BOT_TOKEN_DEFAULT missing") + return + labelled_html = f"

[{escape_rich_html(protocol)}]

{html}" + rich_message = {"html": labelled_html, "skip_entity_detection": skip_entity_detection} + _post_rich_message(bot_token, test_chat_id, rich_message, disable_notification) + return + bot_token, chat_id, topic_id = _telegram_destination(protocol) if not bot_token or not chat_id: logger.warning("Missing Telegram credentials for %s", protocol) return - _post_message(bot_token, chat_id, message, plain_text, disable_notification, topic_id) + try: + _post_rich_message(bot_token, chat_id, rich_message, disable_notification, topic_id) + except TelegramError: + if fallback_message is None: + raise + logger.exception("Failed to send Telegram rich message for %s; sending fallback", protocol) + send_telegram_message(fallback_message, protocol, disable_notification, plain_text=True) def _error_channel_configured() -> bool: