From 88ed20ea52010b3a2d4d4ec4676f4baf39d20009 Mon Sep 17 00:00:00 2001 From: iequidoo Date: Thu, 30 Apr 2026 11:23:42 -0300 Subject: [PATCH 1/2] test: Add function to get sent MDN as SentMessage --- src/smtp.rs | 89 ++++++++++++++++++++++++++--------------------- src/test_utils.rs | 18 +++++++++- 2 files changed, 66 insertions(+), 41 deletions(-) diff --git a/src/smtp.rs b/src/smtp.rs index 7f1bdf1dc8..cf983a18ab 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -492,7 +492,35 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> { return Ok(()); } - let more_mdns = send_mdn(context, connection).await?; + let more_mdns = send_mdn(context, async |rfc724_mid, body, recipients| { + let recipients: Vec<_> = recipients + .into_iter() + .filter_map(|addr| { + async_smtp::EmailAddress::new(addr.clone()) + .with_context(|| format!("Invalid recipient: {addr}")) + .log_err(context) + .ok() + }) + .collect(); + + match smtp_send(context, &recipients, &body, connection, None).await { + SendResult::Success => { + if !recipients.is_empty() { + info!(context, "Successfully sent MDN for {rfc724_mid}."); + } + Ok(true) + } + SendResult::Retry => { + info!( + context, + "Temporary SMTP failure while sending an MDN for {rfc724_mid}." + ); + Ok(false) + } + SendResult::Failure(err) => Err(err), + } + }) + .await?; if !more_mdns { // No more MDNs to send or one of them failed. return Ok(()); @@ -550,7 +578,7 @@ async fn send_mdn_rfc724_mid( context: &Context, rfc724_mid: &str, contact_id: ContactId, - smtp: &mut Smtp, + send_fn: impl AsyncFnOnce(&str, String, Vec) -> Result, ) -> Result { let contact = Contact::get_by_id(context, contact_id).await?; if contact.is_blocked() { @@ -590,48 +618,29 @@ async fn send_mdn_rfc724_mid( if context.get_config_bool(Config::BccSelf).await? { add_self_recipients(context, &mut recipients, encrypted).await?; } - let recipients: Vec<_> = recipients - .into_iter() - .filter_map(|addr| { - async_smtp::EmailAddress::new(addr.clone()) - .with_context(|| format!("Invalid recipient: {addr}")) - .log_err(context) - .ok() - }) - .collect(); message::insert_tombstone(context, &rendered_msg.rfc724_mid).await?; - match smtp_send(context, &recipients, &body, smtp, None).await { - SendResult::Success => { - if !recipients.is_empty() { - info!(context, "Successfully sent MDN for {rfc724_mid}."); - } - context - .sql - .transaction(|transaction| { - let mut stmt = - transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?; - stmt.execute((rfc724_mid,))?; - for additional_rfc724_mid in additional_rfc724_mids { - stmt.execute((additional_rfc724_mid,))?; - } - Ok(()) - }) - .await?; - Ok(true) - } - SendResult::Retry => { - info!( - context, - "Temporary SMTP failure while sending an MDN for {rfc724_mid}." - ); - Ok(false) - } - SendResult::Failure(err) => Err(err), + let sent = send_fn(rfc724_mid, body, recipients).await?; + if sent { + context + .sql + .transaction(|transaction| { + let mut stmt = transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?; + stmt.execute((rfc724_mid,))?; + for additional_rfc724_mid in additional_rfc724_mids { + stmt.execute((additional_rfc724_mid,))?; + } + Ok(()) + }) + .await?; } + Ok(sent) } /// Tries to send a single MDN. Returns true if more MDNs should be sent. -async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result { +pub(crate) async fn send_mdn( + context: &Context, + send_fn: impl AsyncFnOnce(&str, String, Vec) -> Result, +) -> Result { if !context.should_send_mdns().await? { context.sql.execute("DELETE FROM smtp_mdns", []).await?; return Ok(false); @@ -668,7 +677,7 @@ async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result { .await .context("Failed to update MDN retries count")?; - match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await { + match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, send_fn).await { Err(err) => { // If there is an error, for example there is no message corresponding to the msg_id in the // database, do not try to send this MDN again. diff --git a/src/test_utils.rs b/src/test_utils.rs index f5f245c8a3..cbbfad2024 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -43,7 +43,7 @@ use crate::mimeparser::{MimeMessage, SystemMessage}; use crate::pgp::SeipdVersion; use crate::receive_imf::{ReceivedMsg, receive_imf}; use crate::securejoin::{get_securejoin_qr, join_securejoin}; -use crate::smtp::msg_has_pending_smtp_job; +use crate::smtp::{self, msg_has_pending_smtp_job}; use crate::stock_str::StockStrings; use crate::tools::time; @@ -1080,6 +1080,22 @@ ORDER BY id" res } + pub async fn get_sent_mdn(&self) -> SentMessage<'_> { + let mut sent_mdn = None; + smtp::send_mdn(self, async |_rfc724_mid, body, recipients| { + sent_mdn = Some(SentMessage { + payload: body, + sender_msg_id: MsgId::new(u32::MAX), + sender_context: &self.ctx, + recipients: recipients.join(" "), + }); + Ok(true) + }) + .await + .expect("smtp::send_mdn"); + sent_mdn.unwrap() + } + pub async fn golden_test_chat(&self, chat_id: ChatId, filename: &str) { let filename = Path::new("test-data/golden/").join(filename); From 80f6c218f0288cefc552824eecbaf7f64699fe00 Mon Sep 17 00:00:00 2001 From: iequidoo Date: Thu, 21 May 2026 15:27:30 -0300 Subject: [PATCH 2/2] fix: Don't mark messages added to the db later as noticed when handling self-MDN The device which issued the MDN might not have these messages at that moment, so only mark messages having lower ids as noticed. Still, additionally filter messages by timestamp to protect from multi-relay message reordering and overall rely less on the server side. We assume that all clocks in the chat are synchronized. This doesn't fix the problem completely because messages may be received by devices in different order, but should fix it in most cases. --- src/chat.rs | 32 ++++++++++++----------- src/receive_imf.rs | 8 +++++- src/receive_imf/receive_imf_tests.rs | 38 ++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/src/chat.rs b/src/chat.rs index 015a82000c..ee0e88e56f 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -1,7 +1,7 @@ //! # Chat module. use std::cmp; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::fmt; use std::io::Cursor; use std::marker::Sync; @@ -3329,39 +3329,41 @@ pub(crate) async fn mark_old_messages_as_noticed( return Ok(()); } - let mut msgs_by_chat: HashMap = HashMap::new(); + let mut updated_chats = BTreeMap::new(); for msg in msgs { - let chat_id = msg.chat_id; - if let Some(existing_msg) = msgs_by_chat.get(&chat_id) { - if msg.sort_timestamp > existing_msg.sort_timestamp { - msgs_by_chat.insert(chat_id, msg); - } - } else { - msgs_by_chat.insert(chat_id, msg); - } + let Some(&msg_id) = msg.msg_ids.last() else { + continue; + }; + updated_chats + .entry(msg.chat_id) + .and_modify(|val| *val = cmp::max(*val, (msg.sort_timestamp, msg_id))) + .or_insert((msg.sort_timestamp, msg_id)); } let changed_chats = context .sql .transaction(|transaction| { let mut changed_chats = Vec::new(); - for (_, msg) in msgs_by_chat { + for (chat_id, (timestamp, msg_id)) in updated_chats { let changed_rows = transaction.execute( + // Do the same as in receive_imf_inner(). "UPDATE msgs SET state=? WHERE state=? AND hidden=0 AND chat_id=? - AND timestamp<=?;", + AND timestamp<=? + AND id 0 { - changed_chats.push(msg.chat_id); + changed_chats.push(chat_id); } } Ok(changed_chats) diff --git a/src/receive_imf.rs b/src/receive_imf.rs index f5f9a05257..018da05615 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -977,12 +977,18 @@ UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1 context .sql .execute( + // Don't mark messages added to the db later as noticed, regardless of + // `timestamp` -- the device which issued the MDN might not have these + // messages at that moment. Still, additionally filter messages by timestamp + // to protect from multi-relay message reordering and overall rely less on + // the server side. We assume that all clocks in the chat are synchronized. " UPDATE msgs SET state=? WHERE state=? AND hidden=0 AND chat_id=? AND - (timestamp,id)<(?,?)", + timestamp<=? AND + id Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_self_mdn_vs_delayed_msg() -> Result<()> { + let mut tcm = TestContextManager::new(); + let alice = &tcm.alice().await; + let bob1 = &tcm.bob().await; + let bob2 = &tcm.bob().await; + + let alice_chat = alice.create_chat(bob1).await; + let sent1 = alice.send_text(alice_chat.id, "1").await; + SystemTime::shift(Duration::from_secs(1)); + let sent2 = alice.send_text(alice_chat.id, "2").await; + + let msg2_id = bob1.recv_msg(&sent2).await.id; + let msg1_id = bob1.recv_msg(&sent1).await.id; + let bob2_chat_id = bob2.recv_msg(&sent2).await.chat_id; + bob2.recv_msg(&sent1).await; + + message::markseen_msgs(bob1, vec![msg2_id]).await?; + assert_eq!(msg1_id.get_state(bob1).await?, MessageState::InFresh); + let sent = bob1.get_sent_mdn().await; + + bob2.recv_msg_trash(&sent).await; + let mut msgs = get_chat_msgs(bob2, bob2_chat_id).await?; + let ChatItem::Message { msg_id } = msgs.pop().unwrap() else { + unreachable!(); + }; + let msg = Message::load_from_db(bob2, msg_id).await?; + assert_eq!(msg.text, "2"); + assert_eq!(msg.state, MessageState::InSeen); + let ChatItem::Message { msg_id } = msgs.pop().unwrap() else { + unreachable!(); + }; + let msg = Message::load_from_db(bob2, msg_id).await?; + assert_eq!(msg.text, "1"); + assert_eq!(msg.state, MessageState::InFresh); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_gmx_forwarded_msg() -> Result<()> { let t = TestContext::new_alice().await;