Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 127 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@

from utils.alert import Alert, AlertSeverity, register_alert_hook, send_alert
from utils.config import Config, ProtocolConfig
from utils.telegram import TelegramError, send_error_message, send_telegram_message
from utils.telegram import (
TelegramError,
format_rich_table,
send_error_message,
send_telegram_message,
send_telegram_rich_message,
)
from utils.web3_wrapper import (
MAX_BACKOFF_SECONDS,
ProviderConnectionError,
Expand Down Expand Up @@ -290,6 +296,126 @@ def test_send_telegram_message_test_override(self, mock_post):
"[yearn_timelock] Test message",
)

def test_format_rich_table(self):
table = format_rich_table(
["Market", "Utilization"],
[["ETH < WETH", "97.4%"], ["USDC", "95.1%"]],
caption="Aave & Morpho",
alignments=["left", "right"],
)

self.assertEqual(
table,
'<table bordered striped><caption>Aave &amp; Morpho</caption>'
'<tr><th align="left">Market</th><th align="right">Utilization</th></tr>'
'<tr><td align="left">ETH &lt; WETH</td><td align="right">97.4%</td></tr>'
'<tr><td align="left">USDC</td><td align="right">95.1%</td></tr>'
"</table>",
)

def test_format_rich_table_rejects_mismatched_rows(self):
with self.assertRaises(ValueError):
format_rich_table(["Market", "Utilization"], [["ETH"]])

@patch("utils.telegram.requests.post")
def test_send_telegram_rich_message_success(self, mock_post):
mock_response = unittest.mock.Mock()
mock_response.status_code = 200
mock_response.raise_for_status = unittest.mock.Mock()
mock_post.return_value = mock_response

with patch.dict(
os.environ,
{
"TELEGRAM_BOT_TOKEN_TEST": "test_token",
"TELEGRAM_CHAT_ID_TEST": "test_chat_id",
"LOG_LEVEL": "INFO",
},
):
send_telegram_rich_message("<p>Test</p>", "test")

url = mock_post.call_args[0][0]
payload = mock_post.call_args[1]["json"]
self.assertIn("sendRichMessage", url)
self.assertEqual(payload["chat_id"], "test_chat_id")
self.assertEqual(payload["rich_message"], {"html": "<p>Test</p>", "skip_entity_detection": True})
self.assertNotIn("parse_mode", payload)

@patch("utils.telegram.requests.post")
def test_send_telegram_rich_message_with_topic(self, mock_post):
mock_response = unittest.mock.Mock()
mock_response.status_code = 200
mock_response.raise_for_status = unittest.mock.Mock()
mock_post.return_value = mock_response

with patch.dict(
os.environ,
{
"TELEGRAM_BOT_TOKEN_DEFAULT": "default_token",
"TELEGRAM_CHAT_ID_TOPICS": "topics_chat_id",
"TELEGRAM_TOPIC_ID_AAVE": "42",
"LOG_LEVEL": "INFO",
},
):
send_telegram_rich_message("<table><tr><td>ok</td></tr></table>", "aave", disable_notification=True)

url = mock_post.call_args[0][0]
payload = mock_post.call_args[1]["json"]
self.assertIn("default_token", url)
self.assertEqual(payload["chat_id"], "topics_chat_id")
self.assertEqual(payload["message_thread_id"], 42)
self.assertTrue(payload["disable_notification"])

@patch("utils.telegram.requests.post")
def test_send_telegram_rich_message_test_override_labels_protocol(self, mock_post):
mock_response = unittest.mock.Mock()
mock_response.status_code = 200
mock_response.raise_for_status = unittest.mock.Mock()
mock_post.return_value = mock_response

with patch.dict(
os.environ,
{
"TELEGRAM_TEST_CHAT_ID": "dummy_group",
"TELEGRAM_BOT_TOKEN_DEFAULT": "default_token",
"TELEGRAM_BOT_TOKEN_AAVE": "aave_token",
"TELEGRAM_CHAT_ID_TOPICS": "topics_chat_id",
"TELEGRAM_TOPIC_ID_AAVE": "42",
"LOG_LEVEL": "INFO",
},
):
send_telegram_rich_message("<p>Test</p>", "aave")

url = mock_post.call_args[0][0]
payload = mock_post.call_args[1]["json"]
self.assertIn("default_token", url)
self.assertNotIn("aave_token", url)
self.assertEqual(payload["chat_id"], "dummy_group")
self.assertEqual(payload["rich_message"]["html"], "<p>[aave]</p><p>Test</p>")
self.assertNotIn("message_thread_id", payload)

@patch("utils.telegram.requests.post")
def test_send_telegram_rich_message_falls_back_to_plain_text(self, mock_post):
failure = requests.RequestException("Connection error")
mock_response = unittest.mock.Mock()
mock_response.status_code = 200
mock_response.raise_for_status = unittest.mock.Mock()
mock_post.side_effect = [failure, mock_response]

with patch.dict(
os.environ,
{
"TELEGRAM_BOT_TOKEN_TEST": "test_token",
"TELEGRAM_CHAT_ID_TEST": "test_chat_id",
"LOG_LEVEL": "INFO",
},
):
send_telegram_rich_message("<p>Test</p>", "test", fallback_message="Fallback")

fallback_payload = mock_post.call_args_list[1][1]["json"]
self.assertEqual(fallback_payload["text"], "Fallback")
self.assertNotIn("parse_mode", fallback_payload)


class TestSendErrorMessage(unittest.TestCase):
"""Tests for utils.telegram.send_error_message (dedicated errors channel)."""
Expand Down
191 changes: 180 additions & 11 deletions utils/telegram.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import re
from collections.abc import Iterable, Sequence
from html import escape as html_escape

import requests
from dotenv import load_dotenv
Expand All @@ -13,6 +15,11 @@
# Maximum message length allowed by Telegram API
MAX_MESSAGE_LENGTH = 4096

# Telegram Bot API 10.1 rich messages allow substantially larger structured
# messages than sendMessage. Keep this explicit so rich-message callers can
# fail over before handing Telegram malformed/truncated HTML.
MAX_RICH_MESSAGE_LENGTH = 32768

# Channel key for operational errors/diagnostics (GraphQL/fetch failures, retries,
# crashes). Routed to a dedicated chat so transient noise doesn't spam the
# per-protocol alert groups. Resolves via the same env-var scheme as any other
Expand Down Expand Up @@ -43,12 +50,98 @@ def escape_markdown(text: str) -> str:
return text


def escape_rich_html(text: object) -> str:
"""Escape text for Telegram rich-message HTML."""
return html_escape(str(text), quote=True)


def format_rich_table(
headers: Sequence[object],
rows: Iterable[Sequence[object]],
caption: object | None = None,
alignments: Sequence[str] | None = None,
bordered: bool = True,
striped: bool = True,
) -> str:
"""Render a simple Telegram rich-message HTML table.

The returned string is meant for ``send_telegram_rich_message``. Cell
content is escaped as plain text; callers that need inline rich formatting
should build the table HTML themselves.
"""
column_count = len(headers)
if column_count == 0:
raise ValueError("Telegram rich tables need at least one column")
if column_count > 20:
raise ValueError("Telegram rich tables support at most 20 columns")

alignments = alignments or ()
if len(alignments) > column_count:
raise ValueError("Table alignments cannot exceed the number of columns")

normalized_alignments: list[str | None] = []
for alignment in alignments:
if alignment not in {"left", "center", "right"}:
raise ValueError("Table alignment must be one of: left, center, right")
normalized_alignments.append(alignment)
normalized_alignments.extend([None] * (column_count - len(normalized_alignments)))

table_rows = [tuple(row) for row in rows]
for row in table_rows:
if len(row) != column_count:
raise ValueError("All Telegram rich table rows must have the same number of columns as headers")

attrs = []
if bordered:
attrs.append("bordered")
if striped:
attrs.append("striped")
table_tag = "<table" + (f" {' '.join(attrs)}" if attrs else "") + ">"

html_parts = [table_tag]
if caption is not None:
html_parts.append(f"<caption>{escape_rich_html(caption)}</caption>")

def _cell(tag: str, value: object, alignment: str | None) -> str:
align_attr = f' align="{alignment}"' if alignment else ""
return f"<{tag}{align_attr}>{escape_rich_html(value)}</{tag}>"

html_parts.append("<tr>")
html_parts.extend(_cell("th", value, normalized_alignments[index]) for index, value in enumerate(headers))
html_parts.append("</tr>")

for row in table_rows:
html_parts.append("<tr>")
html_parts.extend(_cell("td", value, normalized_alignments[index]) for index, value in enumerate(row))
html_parts.append("</tr>")

html_parts.append("</table>")
return "".join(html_parts)


class TelegramError(Exception):
"""Exception raised for errors in Telegram API interactions."""

pass


def _telegram_destination(protocol: str) -> tuple[str | None, str | None, str | None]:
"""Resolve protocol routing to bot token, chat id, and optional topic id."""
topic_id = os.getenv(f"TELEGRAM_TOPIC_ID_{protocol.upper()}")

if topic_id:
return (
os.getenv("TELEGRAM_BOT_TOKEN_DEFAULT"),
os.getenv("TELEGRAM_CHAT_ID_TOPICS"),
topic_id,
)

bot_token = os.getenv(f"TELEGRAM_BOT_TOKEN_{protocol.upper()}")
if not bot_token:
bot_token = os.getenv("TELEGRAM_BOT_TOKEN_DEFAULT")
return bot_token, os.getenv(f"TELEGRAM_CHAT_ID_{protocol.upper()}"), None


def _post_message(
bot_token: str,
chat_id: str,
Expand Down Expand Up @@ -91,6 +184,42 @@ def _post_message(
)


def _post_rich_message(
bot_token: str,
chat_id: str,
rich_message: dict[str, object],
disable_notification: bool,
topic_id: str | None = None,
) -> None:
"""Send a rich message to Telegram, raising TelegramError on failure."""
url = f"https://api.telegram.org/bot{bot_token}/sendRichMessage"
payload: dict[str, object] = {
"chat_id": chat_id,
"rich_message": rich_message,
"disable_notification": disable_notification,
}
if topic_id:
payload["message_thread_id"] = int(topic_id)

try:
response = requests.post(url, json=payload, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
body = ""
err_response = getattr(e, "response", None)
if err_response is not None:
try:
body = f" body={err_response.text}"
except Exception:
pass
raise TelegramError(_redact_bot_token(f"Failed to send telegram rich message: {e}{body}"))

if response.status_code != 200:
raise TelegramError(
_redact_bot_token(f"Failed to send telegram rich message: {response.status_code} - {response.text}")
)


def send_telegram_message(
message: str,
protocol: str,
Expand Down Expand Up @@ -140,25 +269,65 @@ def send_telegram_message(
_post_message(bot_token, test_chat_id, f"{label}{message}", plain_text, disable_notification)
return

# Check if this protocol has a topic ID configured (forum-style group)
topic_id = os.getenv(f"TELEGRAM_TOPIC_ID_{protocol.upper()}")
bot_token, chat_id, topic_id = _telegram_destination(protocol)

if topic_id:
# Topics always use the default bot and the shared topics chat
if not bot_token or not chat_id:
logger.warning("Missing Telegram credentials for %s", protocol)
return

_post_message(bot_token, chat_id, message, plain_text, disable_notification, topic_id)


def send_telegram_rich_message(
html: str,
protocol: str,
disable_notification: bool = False,
fallback_message: str | None = None,
skip_entity_detection: bool = True,
) -> None:
"""Send Telegram Bot API 10.1 rich-message HTML.

Use this for structured content like tables. If the Bot API rejects the rich
payload and ``fallback_message`` is provided, the fallback is sent as plain
text through the existing ``sendMessage`` path.
"""
logger.debug("Sending telegram rich message:\n%s", html)

if os.getenv("LOG_LEVEL", "INFO").upper() == "DEBUG":
logger.debug("Skipping Telegram rich send (LOG_LEVEL=DEBUG)")
return

if len(html) > MAX_RICH_MESSAGE_LENGTH:
if fallback_message is not None:
send_telegram_message(fallback_message, protocol, disable_notification, plain_text=True)
return
raise TelegramError(f"Telegram rich message exceeds {MAX_RICH_MESSAGE_LENGTH} characters")

rich_message: dict[str, object] = {"html": html, "skip_entity_detection": skip_entity_detection}

test_chat_id = os.getenv("TELEGRAM_TEST_CHAT_ID")
if test_chat_id:
bot_token = os.getenv("TELEGRAM_BOT_TOKEN_DEFAULT")
chat_id = os.getenv("TELEGRAM_CHAT_ID_TOPICS")
else:
# Legacy per-protocol chat routing
bot_token = os.getenv(f"TELEGRAM_BOT_TOKEN_{protocol.upper()}")
if not bot_token:
bot_token = os.getenv("TELEGRAM_BOT_TOKEN_DEFAULT")
chat_id = os.getenv(f"TELEGRAM_CHAT_ID_{protocol.upper()}")
logger.warning("TELEGRAM_TEST_CHAT_ID set but TELEGRAM_BOT_TOKEN_DEFAULT missing")
return
labelled_html = f"<p>[{escape_rich_html(protocol)}]</p>{html}"
rich_message = {"html": labelled_html, "skip_entity_detection": skip_entity_detection}
_post_rich_message(bot_token, test_chat_id, rich_message, disable_notification)
return

bot_token, chat_id, topic_id = _telegram_destination(protocol)
if not bot_token or not chat_id:
logger.warning("Missing Telegram credentials for %s", protocol)
return

_post_message(bot_token, chat_id, message, plain_text, disable_notification, topic_id)
try:
_post_rich_message(bot_token, chat_id, rich_message, disable_notification, topic_id)
except TelegramError:
if fallback_message is None:
raise
logger.exception("Failed to send Telegram rich message for %s; sending fallback", protocol)
send_telegram_message(fallback_message, protocol, disable_notification, plain_text=True)


def _error_channel_configured() -> bool:
Expand Down
Loading