diff --git a/src/chat.rs b/src/chat.rs index 015a82000c..ee0e88e56f 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -1,7 +1,7 @@ //! # Chat module. use std::cmp; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::fmt; use std::io::Cursor; use std::marker::Sync; @@ -3329,39 +3329,41 @@ pub(crate) async fn mark_old_messages_as_noticed( return Ok(()); } - let mut msgs_by_chat: HashMap = HashMap::new(); + let mut updated_chats = BTreeMap::new(); for msg in msgs { - let chat_id = msg.chat_id; - if let Some(existing_msg) = msgs_by_chat.get(&chat_id) { - if msg.sort_timestamp > existing_msg.sort_timestamp { - msgs_by_chat.insert(chat_id, msg); - } - } else { - msgs_by_chat.insert(chat_id, msg); - } + let Some(&msg_id) = msg.msg_ids.last() else { + continue; + }; + updated_chats + .entry(msg.chat_id) + .and_modify(|val| *val = cmp::max(*val, (msg.sort_timestamp, msg_id))) + .or_insert((msg.sort_timestamp, msg_id)); } let changed_chats = context .sql .transaction(|transaction| { let mut changed_chats = Vec::new(); - for (_, msg) in msgs_by_chat { + for (chat_id, (timestamp, msg_id)) in updated_chats { let changed_rows = transaction.execute( + // Do the same as in receive_imf_inner(). "UPDATE msgs SET state=? WHERE state=? AND hidden=0 AND chat_id=? - AND timestamp<=?;", + AND timestamp<=? + AND id 0 { - changed_chats.push(msg.chat_id); + changed_chats.push(chat_id); } } Ok(changed_chats) diff --git a/src/receive_imf.rs b/src/receive_imf.rs index f5f9a05257..018da05615 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -977,12 +977,18 @@ UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1 context .sql .execute( + // Don't mark messages added to the db later as noticed, regardless of + // `timestamp` -- the device which issued the MDN might not have these + // messages at that moment. Still, additionally filter messages by timestamp + // to protect from multi-relay message reordering and overall rely less on + // the server side. We assume that all clocks in the chat are synchronized. " UPDATE msgs SET state=? WHERE state=? AND hidden=0 AND chat_id=? AND - (timestamp,id)<(?,?)", + timestamp<=? AND + id Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_self_mdn_vs_delayed_msg() -> Result<()> { + let mut tcm = TestContextManager::new(); + let alice = &tcm.alice().await; + let bob1 = &tcm.bob().await; + let bob2 = &tcm.bob().await; + + let alice_chat = alice.create_chat(bob1).await; + let sent1 = alice.send_text(alice_chat.id, "1").await; + SystemTime::shift(Duration::from_secs(1)); + let sent2 = alice.send_text(alice_chat.id, "2").await; + + let msg2_id = bob1.recv_msg(&sent2).await.id; + let msg1_id = bob1.recv_msg(&sent1).await.id; + let bob2_chat_id = bob2.recv_msg(&sent2).await.chat_id; + bob2.recv_msg(&sent1).await; + + message::markseen_msgs(bob1, vec![msg2_id]).await?; + assert_eq!(msg1_id.get_state(bob1).await?, MessageState::InFresh); + let sent = bob1.get_sent_mdn().await; + + bob2.recv_msg_trash(&sent).await; + let mut msgs = get_chat_msgs(bob2, bob2_chat_id).await?; + let ChatItem::Message { msg_id } = msgs.pop().unwrap() else { + unreachable!(); + }; + let msg = Message::load_from_db(bob2, msg_id).await?; + assert_eq!(msg.text, "2"); + assert_eq!(msg.state, MessageState::InSeen); + let ChatItem::Message { msg_id } = msgs.pop().unwrap() else { + unreachable!(); + }; + let msg = Message::load_from_db(bob2, msg_id).await?; + assert_eq!(msg.text, "1"); + assert_eq!(msg.state, MessageState::InFresh); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_gmx_forwarded_msg() -> Result<()> { let t = TestContext::new_alice().await; diff --git a/src/smtp.rs b/src/smtp.rs index 7f1bdf1dc8..cf983a18ab 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -492,7 +492,35 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> { return Ok(()); } - let more_mdns = send_mdn(context, connection).await?; + let more_mdns = send_mdn(context, async |rfc724_mid, body, recipients| { + let recipients: Vec<_> = recipients + .into_iter() + .filter_map(|addr| { + async_smtp::EmailAddress::new(addr.clone()) + .with_context(|| format!("Invalid recipient: {addr}")) + .log_err(context) + .ok() + }) + .collect(); + + match smtp_send(context, &recipients, &body, connection, None).await { + SendResult::Success => { + if !recipients.is_empty() { + info!(context, "Successfully sent MDN for {rfc724_mid}."); + } + Ok(true) + } + SendResult::Retry => { + info!( + context, + "Temporary SMTP failure while sending an MDN for {rfc724_mid}." + ); + Ok(false) + } + SendResult::Failure(err) => Err(err), + } + }) + .await?; if !more_mdns { // No more MDNs to send or one of them failed. return Ok(()); @@ -550,7 +578,7 @@ async fn send_mdn_rfc724_mid( context: &Context, rfc724_mid: &str, contact_id: ContactId, - smtp: &mut Smtp, + send_fn: impl AsyncFnOnce(&str, String, Vec) -> Result, ) -> Result { let contact = Contact::get_by_id(context, contact_id).await?; if contact.is_blocked() { @@ -590,48 +618,29 @@ async fn send_mdn_rfc724_mid( if context.get_config_bool(Config::BccSelf).await? { add_self_recipients(context, &mut recipients, encrypted).await?; } - let recipients: Vec<_> = recipients - .into_iter() - .filter_map(|addr| { - async_smtp::EmailAddress::new(addr.clone()) - .with_context(|| format!("Invalid recipient: {addr}")) - .log_err(context) - .ok() - }) - .collect(); message::insert_tombstone(context, &rendered_msg.rfc724_mid).await?; - match smtp_send(context, &recipients, &body, smtp, None).await { - SendResult::Success => { - if !recipients.is_empty() { - info!(context, "Successfully sent MDN for {rfc724_mid}."); - } - context - .sql - .transaction(|transaction| { - let mut stmt = - transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?; - stmt.execute((rfc724_mid,))?; - for additional_rfc724_mid in additional_rfc724_mids { - stmt.execute((additional_rfc724_mid,))?; - } - Ok(()) - }) - .await?; - Ok(true) - } - SendResult::Retry => { - info!( - context, - "Temporary SMTP failure while sending an MDN for {rfc724_mid}." - ); - Ok(false) - } - SendResult::Failure(err) => Err(err), + let sent = send_fn(rfc724_mid, body, recipients).await?; + if sent { + context + .sql + .transaction(|transaction| { + let mut stmt = transaction.prepare("DELETE FROM smtp_mdns WHERE rfc724_mid = ?")?; + stmt.execute((rfc724_mid,))?; + for additional_rfc724_mid in additional_rfc724_mids { + stmt.execute((additional_rfc724_mid,))?; + } + Ok(()) + }) + .await?; } + Ok(sent) } /// Tries to send a single MDN. Returns true if more MDNs should be sent. -async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result { +pub(crate) async fn send_mdn( + context: &Context, + send_fn: impl AsyncFnOnce(&str, String, Vec) -> Result, +) -> Result { if !context.should_send_mdns().await? { context.sql.execute("DELETE FROM smtp_mdns", []).await?; return Ok(false); @@ -668,7 +677,7 @@ async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result { .await .context("Failed to update MDN retries count")?; - match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, smtp).await { + match send_mdn_rfc724_mid(context, &rfc724_mid, contact_id, send_fn).await { Err(err) => { // If there is an error, for example there is no message corresponding to the msg_id in the // database, do not try to send this MDN again. diff --git a/src/test_utils.rs b/src/test_utils.rs index f5f245c8a3..cbbfad2024 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -43,7 +43,7 @@ use crate::mimeparser::{MimeMessage, SystemMessage}; use crate::pgp::SeipdVersion; use crate::receive_imf::{ReceivedMsg, receive_imf}; use crate::securejoin::{get_securejoin_qr, join_securejoin}; -use crate::smtp::msg_has_pending_smtp_job; +use crate::smtp::{self, msg_has_pending_smtp_job}; use crate::stock_str::StockStrings; use crate::tools::time; @@ -1080,6 +1080,22 @@ ORDER BY id" res } + pub async fn get_sent_mdn(&self) -> SentMessage<'_> { + let mut sent_mdn = None; + smtp::send_mdn(self, async |_rfc724_mid, body, recipients| { + sent_mdn = Some(SentMessage { + payload: body, + sender_msg_id: MsgId::new(u32::MAX), + sender_context: &self.ctx, + recipients: recipients.join(" "), + }); + Ok(true) + }) + .await + .expect("smtp::send_mdn"); + sent_mdn.unwrap() + } + pub async fn golden_test_chat(&self, chat_id: ChatId, filename: &str) { let filename = Path::new("test-data/golden/").join(filename);