diff --git a/crates/enc-ffmpeg/src/base.rs b/crates/enc-ffmpeg/src/base.rs index 60179c8fa1c..ab0f5dc1a7a 100644 --- a/crates/enc-ffmpeg/src/base.rs +++ b/crates/enc-ffmpeg/src/base.rs @@ -13,6 +13,14 @@ pub struct EncoderBase { first_pts: Option, last_frame_pts: Option, last_written_dts: Option, + /// One-packet reorder buffer: a packet is written once its successor is + /// known so a synthesized duration can be replaced with the real dts + /// delta. Fragmenting muxers place each fragment at the accumulated + /// duration of the previous one, and the last sample of a fragment uses + /// the packet duration verbatim — a nominal duration there collapses any + /// capture gap that lands on a fragment cut, playing post-gap content + /// during the gap. The bool records whether the duration was synthesized. + held_packet: Option<(Packet, bool)>, } impl EncoderBase { @@ -23,6 +31,7 @@ impl EncoderBase { last_frame_pts: None, stream_index, last_written_dts: None, + held_packet: None, } } @@ -133,7 +142,8 @@ impl EncoderBase { _ => {} } - if self.packet.duration() <= 0 + let duration_synthesized = self.packet.duration() <= 0; + if duration_synthesized && let Some(duration) = nominal_packet_duration( output.stream(self.stream_index).unwrap().time_base(), encoder.frame_rate(), @@ -161,7 +171,18 @@ impl EncoderBase { } self.last_written_dts = self.packet.dts(); - self.packet.write_interleaved(output)?; + + let current = std::mem::replace(&mut self.packet, Packet::empty()); + if let Some((mut previous, previous_synthesized)) = self.held_packet.take() { + if previous_synthesized + && let (Some(prev_dts), Some(cur_dts)) = (previous.dts(), current.dts()) + && cur_dts > prev_dts + { + previous.set_duration(cur_dts - prev_dts); + } + previous.write_interleaved(output)?; + } + self.held_packet = Some((current, duration_synthesized)); } Ok(()) @@ -174,7 +195,13 @@ impl EncoderBase { ) -> Result<(), ffmpeg::Error> { encoder.send_eof()?; - self.process_packets(output, encoder) + self.process_packets(output, encoder)?; + + if let Some((previous, _)) = self.held_packet.take() { + previous.write_interleaved(output)?; + } + + Ok(()) } } diff --git a/crates/enc-ffmpeg/src/mux/segmented_stream.rs b/crates/enc-ffmpeg/src/mux/segmented_stream.rs index f6ec79014ad..a176d2c689a 100644 --- a/crates/enc-ffmpeg/src/mux/segmented_stream.rs +++ b/crates/enc-ffmpeg/src/mux/segmented_stream.rs @@ -328,7 +328,6 @@ impl SegmentedVideoEncoder { frame: frame::Video, timestamp: Duration, ) -> Result<(), QueueFrameError> { - let is_first_frame = self.segment_start_time.is_none(); let segment_start = match self.segment_start_time { Some(start) => start, None => { @@ -347,9 +346,10 @@ impl SegmentedVideoEncoder { .queue_frame(frame, timestamp, &mut self.output)?; self.frames_in_segment += 1; - if is_first_frame { - self.try_notify_init_segment(); - } + // The encoder holds one packet back to stamp real durations, so the + // init segment materializes a frame later than the first queue call; + // keep trying until it lands (no-op once notified). + self.try_notify_init_segment(); if !self.pending_segment_indices.is_empty() { self.frames_since_pending_flush += 1; @@ -1071,6 +1071,103 @@ mod tests { } } + // A capture gap whose post-gap frame lands on a segment cut: the dash + // muxer anchors each fragment at the accumulated duration of the previous + // one, and the last sample of a fragment takes its packet duration + // verbatim. Without real packet durations the first post-gap frame is + // pulled back onto the pre-gap timeline (one frame period after the last + // pre-gap frame) and its content plays DURING the gap — a multi-second + // desync for that frame and a collapsed hold for the viewer. + #[test] + fn gap_crossing_segment_cut_preserves_post_gap_pts() { + ffmpeg::init().ok(); + + let temp = tempfile::tempdir().unwrap(); + let base_path = temp.path().to_path_buf(); + + let mut encoder = SegmentedVideoEncoder::init( + base_path.clone(), + test_video_info(), + SegmentedVideoEncoderConfig { + segment_duration: Duration::from_millis(500), + ..Default::default() + }, + ) + .unwrap(); + + // ~30fps up to 0.4s, a 2.6s gap, then more frames. The post-gap + // frame is forced to a keyframe so the dash muxer cuts the segment + // exactly at the gap — the shape that loses the gap. + let pre_gap_ms: Vec = (0..12).map(|i| i * 33).collect(); + let post_gap_ms: Vec = (0..12).map(|i| 3000 + i * 33).collect(); + for &ts_ms in &pre_gap_ms { + let frame = create_test_frame(320, 240); + encoder + .queue_frame(frame, Duration::from_millis(ts_ms)) + .unwrap(); + } + for (i, &ts_ms) in post_gap_ms.iter().enumerate() { + let mut frame = create_test_frame(320, 240); + if i == 0 { + frame.set_kind(ffmpeg::picture::Type::I); + } + encoder + .queue_frame(frame, Duration::from_millis(ts_ms)) + .unwrap(); + } + + encoder.finish().unwrap(); + + let mut segment_paths: Vec = std::fs::read_dir(&base_path) + .unwrap() + .filter_map(|e| e.ok().map(|e| e.path())) + .filter(|p| p.extension().is_some_and(|ext| ext == "m4s")) + .collect(); + segment_paths.sort(); + + let concat_path = base_path.join("concat_test.mp4"); + let mut concatenated = std::fs::read(base_path.join(INIT_SEGMENT_NAME)).unwrap(); + for segment in &segment_paths { + concatenated.extend(std::fs::read(segment).unwrap()); + } + std::fs::write(&concat_path, concatenated).unwrap(); + + let mut input = format::input(&concat_path).unwrap(); + let stream_index = input + .streams() + .best(ffmpeg::media::Type::Video) + .unwrap() + .index(); + let time_base = input.stream(stream_index).unwrap().time_base(); + let tb = time_base.numerator() as f64 / time_base.denominator() as f64; + + let mut pts_secs: Vec = input + .packets() + .filter_map(|(stream, packet)| { + (stream.index() == stream_index) + .then_some(packet.pts()) + .flatten() + }) + .map(|pts| pts as f64 * tb) + .collect(); + pts_secs.sort_by(|a, b| a.partial_cmp(b).unwrap()); + + assert_eq!(pts_secs.len(), pre_gap_ms.len() + post_gap_ms.len()); + + let expected: Vec = pre_gap_ms + .iter() + .chain(post_gap_ms.iter()) + .map(|&ms| ms as f64 / 1000.0) + .collect(); + for (pts, expected) in pts_secs.iter().zip(&expected) { + assert!( + (pts - expected).abs() < 0.04, + "encoded pts {pts:.3}s should match capture timestamp {expected:.3}s \ + (all pts: {pts_secs:?})" + ); + } + } + #[test] fn manifest_updated_on_segment_boundary() { ffmpeg::init().ok(); @@ -1126,8 +1223,15 @@ mod tests { ) .unwrap(); - let frame = create_test_frame(320, 240); - encoder.queue_frame(frame, Duration::ZERO).unwrap(); + // The encoder holds one packet to stamp real durations (and hardware + // encoders add their own delay), so the init segment lands once + // enough frames have pushed the first packet through. + for i in 0..10u64 { + let frame = create_test_frame(320, 240); + encoder + .queue_frame(frame, Duration::from_millis(i * 33)) + .unwrap(); + } assert!(encoder.validate_init_segment().is_ok()); } diff --git a/crates/enc-ffmpeg/src/video/h264.rs b/crates/enc-ffmpeg/src/video/h264.rs index 1bda2b92288..789f12ba98e 100644 --- a/crates/enc-ffmpeg/src/video/h264.rs +++ b/crates/enc-ffmpeg/src/video/h264.rs @@ -51,6 +51,15 @@ pub enum H264EncoderError { PixFmtNotSupported(Pixel), #[error("Invalid output dimensions {width}x{height}; expected non-zero even width and height")] InvalidOutputDimensions { width: u32, height: u32 }, + #[error("Hardware encoder self-test failed: {0}")] + SelfTest(String), +} + +fn is_hardware_h264(codec_name: &str) -> bool { + matches!( + codec_name, + "h264_videotoolbox" | "h264_nvenc" | "h264_qsv" | "h264_amf" | "h264_mf" + ) } impl H264EncoderBuilder { @@ -155,6 +164,26 @@ impl H264EncoderBuilder { for (codec, encoder_options) in candidates { let codec_name = codec.name().to_string(); + if is_hardware_h264(&codec_name) + && let Err(reason) = cached_hardware_self_test( + codec, + &encoder_options, + &input_config, + output_width, + output_height, + self.bpp, + self.crf, + ) + { + warn!( + encoder = %codec_name, + %reason, + "Hardware H264 encoder failed pre-flight self-test, trying next candidate" + ); + last_error = Some(H264EncoderError::SelfTest(reason)); + continue; + } + match Self::build_with_codec( codec, encoder_options, @@ -167,10 +196,7 @@ impl H264EncoderBuilder { self.crf, ) { Ok(encoder) => { - let is_hardware = matches!( - codec_name.as_str(), - "h264_videotoolbox" | "h264_nvenc" | "h264_qsv" | "h264_amf" | "h264_mf" - ); + let is_hardware = is_hardware_h264(&codec_name); let fps = input_config.frame_rate.0 as f32 / input_config.frame_rate.1.max(1) as f32; if is_hardware { @@ -302,6 +328,27 @@ impl H264EncoderBuilder { for (codec, encoder_options) in candidates { let codec_name = codec.name().to_string(); + + if is_hardware_h264(&codec_name) + && let Err(reason) = cached_hardware_self_test( + codec, + &encoder_options, + &input_config, + output_width, + output_height, + self.bpp, + self.crf, + ) + { + warn!( + encoder = %codec_name, + %reason, + "Hardware H264 encoder failed pre-flight self-test, trying next candidate" + ); + last_error = Some(H264EncoderError::SelfTest(reason)); + continue; + } + match Self::build_standalone_with_codec( codec, encoder_options, @@ -1086,6 +1133,290 @@ fn get_codec_and_options( encoders } +/// Some GPU driver stacks open a hardware H264 session successfully but then +/// emit zeroed YUV — rendered as a solid green (or black) recording — with no +/// error surfaced anywhere. Before committing a recording to a hardware +/// encoder, encode a short neutral-gray clip and decode it back with the +/// software decoder; if the gray does not survive the round trip, the +/// candidate is rejected and encoder selection falls through to the next one +/// (terminating at libx264, which never takes this path). +/// +/// Windows-only: that is where the multi-vendor encoder/driver matrix +/// (nvenc/amf/qsv/mf) lives and where zeroed-output reports come from. +/// VideoToolbox is a single-vendor OS stack, and measured session creation +/// alone costs ~1.6s — too much to add to recording start for a failure mode +/// never observed there. Results are cached per everything that shapes the +/// encode path (encoder, resolution, input pixel format, frame rate, bitrate +/// inputs, and the full option set) so the cost is one-time per process, and +/// `CAP_DISABLE_ENCODER_SELF_TEST=1` bypasses the check as an escape hatch. +fn cached_hardware_self_test( + codec: Codec, + encoder_options: &Dictionary<'static>, + input_config: &VideoInfo, + output_width: u32, + output_height: u32, + bpp: f32, + crf: Option, +) -> Result<(), String> { + use std::{ + collections::HashMap, + fmt::Write, + sync::{Mutex, OnceLock}, + }; + + if !cfg!(target_os = "windows") { + return Ok(()); + } + + if std::env::var("CAP_DISABLE_ENCODER_SELF_TEST") + .is_ok_and(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes" | "on")) + { + return Ok(()); + } + + type Cache = Mutex>>; + static CACHE: OnceLock = OnceLock::new(); + let cache = CACHE.get_or_init(|| Mutex::new(HashMap::new())); + + // The verdict depends on everything that shapes the encoder session: + // options (which differ between recording and export), frame rate (the + // keyframe interval), the input pixel format (it selects the negotiated + // encode format), and the bitrate inputs — not just name and size. + let mut key = format!( + "{}|{}x{}|{:?}|{}/{}|{}|{:?}", + codec.name(), + output_width, + output_height, + input_config.pixel_format, + input_config.frame_rate.numerator(), + input_config.frame_rate.denominator(), + bpp, + crf, + ); + for (k, v) in encoder_options.iter() { + let _ = write!(key, "|{k}={v}"); + } + if let Some(result) = cache + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .get(&key) + { + return result.clone(); + } + + let started = std::time::Instant::now(); + let result = hardware_encoder_self_test( + codec, + clone_options(encoder_options), + input_config, + output_width, + output_height, + bpp, + crf, + ); + debug!( + encoder = %codec.name(), + width = output_width, + height = output_height, + elapsed_ms = started.elapsed().as_millis(), + ok = result.is_ok(), + "Hardware H264 encoder self-test finished" + ); + cache + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .insert(key, result.clone()); + result +} + +fn clone_options(options: &Dictionary<'static>) -> Dictionary<'static> { + let mut copy = Dictionary::new(); + for (k, v) in options.iter() { + copy.set(k, v); + } + copy +} + +fn hardware_encoder_self_test( + codec: Codec, + encoder_options: Dictionary<'static>, + input_config: &VideoInfo, + output_width: u32, + output_height: u32, + bpp: f32, + crf: Option, +) -> Result<(), String> { + // A throwaway encoder with production settings; frames are built directly + // in the negotiated output format, so no converter is needed. + let opened = open_video_encoder_inner( + codec, + encoder_options, + input_config, + output_width, + output_height, + bpp, + true, + crf, + false, + ) + .map_err(|e| format!("test encoder open: {e}"))?; + let mut encoder = opened.encoder; + + // Mid-gray is neutral in both RGB (128,128,128) and YUV (Y=128, U=V=128) + // representations, so filling every plane with 128 produces a valid gray + // frame in any format the encoder negotiated. + let mut test_frame = frame::Video::new(opened.output_format, output_width, output_height); + for i in 0..test_frame.planes() { + test_frame.data_mut(i).fill(128); + } + + let fps = { + let num = input_config.frame_rate.numerator().max(1); + let den = input_config.frame_rate.denominator().max(1); + (f64::from(num) / f64::from(den)).max(1.0) + }; + let pts_step = ((f64::from(input_config.time_base.denominator().max(1)) / fps) as i64).max(1); + + fn drain_packets(encoder: &mut encoder::Video, packets: &mut Vec) { + loop { + let mut packet = ffmpeg::Packet::empty(); + if encoder.receive_packet(&mut packet).is_err() { + break; + } + packets.push(packet); + } + } + + let mut packets = Vec::new(); + const TEST_FRAME_COUNT: i64 = 8; + for i in 0..TEST_FRAME_COUNT { + test_frame.set_pts(Some(i * pts_step)); + encoder + .send_frame(&test_frame) + .map_err(|e| format!("send_frame: {e}"))?; + drain_packets(&mut encoder, &mut packets); + } + encoder.send_eof().map_err(|e| format!("send_eof: {e}"))?; + drain_packets(&mut encoder, &mut packets); + + if packets.is_empty() { + return Err("encoder produced no packets".to_string()); + } + + let decoder_codec = ffmpeg::codec::decoder::find(ffmpeg::codec::Id::H264) + .ok_or_else(|| "no software H264 decoder available".to_string())?; + let mut decoder_ctx = context::Context::new_with_codec(decoder_codec); + // Hand over out-of-band SPS/PPS when the encoder produced extradata; + // in-band parameter sets (the common case without GLOBAL_HEADER) decode + // without it. + unsafe { + let enc = encoder.as_ptr(); + if !(*enc).extradata.is_null() && (*enc).extradata_size > 0 { + let size = (*enc).extradata_size as usize; + let buf = + ffmpeg::ffi::av_mallocz(size + ffmpeg::ffi::AV_INPUT_BUFFER_PADDING_SIZE as usize) + .cast::(); + if !buf.is_null() { + std::ptr::copy_nonoverlapping((*enc).extradata, buf, size); + let dec = decoder_ctx.as_mut_ptr(); + (*dec).extradata = buf; + (*dec).extradata_size = size as i32; + } + } + } + let mut decoder = decoder_ctx + .decoder() + .video() + .map_err(|e| format!("test decoder open: {e}"))?; + + let mut decoded = frame::Video::empty(); + let mut decoded_count = 0usize; + for packet in &packets { + if decoder.send_packet(packet).is_err() { + continue; + } + while decoder.receive_frame(&mut decoded).is_ok() { + decoded_count += 1; + verify_neutral_gray(&decoded)?; + } + } + let _ = decoder.send_eof(); + while decoder.receive_frame(&mut decoded).is_ok() { + decoded_count += 1; + verify_neutral_gray(&decoded)?; + } + + if decoded_count == 0 { + return Err(format!( + "none of {} encoded packets decoded to a frame", + packets.len() + )); + } + + Ok(()) +} + +/// Asserts a decoded self-test frame still carries the neutral-gray content +/// that was encoded. A zeroed-YUV frame (Y=U=V=0, rendered green) or a black +/// frame (Y=16) fails by a wide margin; any functioning encoder reproduces a +/// flat gray I-frame nearly losslessly. +fn verify_neutral_gray(frame: &frame::Video) -> Result<(), String> { + const TOLERANCE: f64 = 32.0; + + fn plane_mean(frame: &frame::Video, plane: usize, row_bytes: usize, rows: usize) -> f64 { + let stride = frame.stride(plane); + let data = frame.data(plane); + let mut sum = 0u64; + let mut count = 0u64; + for row in 0..rows { + let start = row * stride; + let Some(slice) = data.get(start..start + row_bytes) else { + break; + }; + sum += slice.iter().map(|&b| u64::from(b)).sum::(); + count += row_bytes as u64; + } + if count == 0 { + return f64::NAN; + } + sum as f64 / count as f64 + } + + let width = frame.width() as usize; + let height = frame.height() as usize; + let means: Vec = match frame.format() { + format::Pixel::YUV420P | format::Pixel::YUVJ420P => vec![ + plane_mean(frame, 0, width, height), + plane_mean(frame, 1, width.div_ceil(2), height.div_ceil(2)), + plane_mean(frame, 2, width.div_ceil(2), height.div_ceil(2)), + ], + format::Pixel::NV12 => vec![ + plane_mean(frame, 0, width, height), + plane_mean(frame, 1, width, height.div_ceil(2)), + ], + // Our encoders are configured for 8-bit 4:2:0; anything else decoding + // out of the test stream is itself evidence the session is not doing + // what was asked. Fail closed — the cost is falling back to the next + // candidate, never accepting output we could not verify. + other => { + return Err(format!( + "decoded self-test frame has unexpected pixel format {other:?}" + )); + } + }; + + for (plane, mean) in means.iter().enumerate() { + if !mean.is_finite() || (mean - 128.0).abs() > TOLERANCE { + return Err(format!( + "decoded plane {plane} mean {mean:.1} deviates from neutral gray 128 \ + (zeroed or corrupted encoder output)" + )); + } + } + + Ok(()) +} + fn get_bitrate(width: u32, height: u32, frame_rate: f32, bpp: f32) -> usize { // higher frame rates don't really need double the bitrate lets be real let frame_rate_multiplier = ((frame_rate as f64 - 30.0).max(0.0) * 0.6) + 30.0; @@ -1094,3 +1425,38 @@ fn get_bitrate(width: u32, height: u32, frame_rate: f32, bpp: f32) -> usize { (pixels_per_second * bpp as f64) as usize } + +#[cfg(test)] +mod self_test_tests { + use super::*; + + fn yuv420p_frame(y: u8, u: u8, v: u8) -> frame::Video { + let mut frame = frame::Video::new(format::Pixel::YUV420P, 64, 48); + frame.data_mut(0).fill(y); + frame.data_mut(1).fill(u); + frame.data_mut(2).fill(v); + frame + } + + #[test] + fn verifier_accepts_gray_and_rejects_zeroed_or_black() { + assert!(verify_neutral_gray(&yuv420p_frame(128, 128, 128)).is_ok()); + assert!(verify_neutral_gray(&yuv420p_frame(120, 132, 125)).is_ok()); + // Zeroed YUV is what renders as the solid-green display. + assert!(verify_neutral_gray(&yuv420p_frame(0, 0, 0)).is_err()); + // A black frame means the encoder dropped the content entirely. + assert!(verify_neutral_gray(&yuv420p_frame(16, 128, 128)).is_err()); + } + + #[test] + fn self_test_round_trip_passes_on_software_encoder() { + ffmpeg::init().ok(); + let codec = encoder::find_by_name("libx264").expect("libx264 available"); + let mut options = Dictionary::new(); + options.set("preset", "ultrafast"); + + let config = VideoInfo::from_raw(cap_media_info::RawVideoFormat::Bgra, 160, 120, 30); + hardware_encoder_self_test(codec, options, &config, 160, 120, 0.3, None) + .expect("healthy encoder passes the round trip"); + } +} diff --git a/crates/recording/src/output_pipeline/core.rs b/crates/recording/src/output_pipeline/core.rs index 298724048f4..3d8393c6bc4 100644 --- a/crates/recording/src/output_pipeline/core.rs +++ b/crates/recording/src/output_pipeline/core.rs @@ -28,6 +28,10 @@ use tracing::*; const CONSECUTIVE_ANOMALY_ERROR_THRESHOLD: u64 = 60; const LARGE_BACKWARD_JUMP_SECS: f64 = 1.0; const LARGE_FORWARD_JUMP_SECS: f64 = 2.0; +/// How far a timestamp may lead the pipeline wall clock before a forward jump +/// is treated as a source-clock glitch instead of a real delivery gap. Covers +/// driver timestamp skew and warmup baseline offset. +const FORWARD_JUMP_WALL_TOLERANCE_SECS: f64 = 0.3; const HEALTH_CHANNEL_CAPACITY: usize = 32; @@ -1048,6 +1052,7 @@ impl TimestampAnomalyTracker { &mut self, timestamp: Timestamp, timestamps: Timestamps, + wall_elapsed: Duration, ) -> Result { let now = Instant::now(); @@ -1073,7 +1078,7 @@ impl TimestampAnomalyTracker { { let jump_secs = forward_jump.as_secs_f64(); if jump_secs > LARGE_FORWARD_JUMP_SECS { - let result = self.handle_forward_jump(last, adjusted, jump_secs, now); + let result = self.handle_forward_jump(last, adjusted, jump_secs, now, wall_elapsed); self.last_valid_wall_clock = Some(now); return result; } @@ -1156,11 +1161,21 @@ impl TimestampAnomalyTracker { current: Duration, jump_secs: f64, now: Instant, + wall_elapsed: Duration, ) -> Result { - let wall_clock_confirmed = self.last_valid_wall_clock.is_some_and(|last_wc| { + let arrival_confirmed = self.last_valid_wall_clock.is_some_and(|last_wc| { let wall_clock_gap_secs = now.duration_since(last_wc).as_secs_f64(); wall_clock_gap_secs >= jump_secs * 0.5 }); + // A frame captured in real time can never be stamped ahead of the + // wall clock, so a jump landing at-or-behind it is a real delivery + // gap even when downstream queueing bunched the arrivals together + // (a loaded encoder drains the pre-gap backlog and the post-gap + // frame back-to-back, defeating the arrival-spacing check above). + // Only future-stamped jumps are source-clock glitches. + let within_wall_clock = + current.as_secs_f64() <= wall_elapsed.as_secs_f64() + FORWARD_JUMP_WALL_TOLERANCE_SECS; + let wall_clock_confirmed = arrival_confirmed || within_wall_clock; self.total_forward_skew_secs += jump_secs; if jump_secs > self.max_forward_skew_secs { @@ -2072,7 +2087,12 @@ fn spawn_video_encoder, TVideo: V + remap.duration().saturating_sub(total_pause_duration), ); - let raw_duration = match anomaly_tracker.process_timestamp(remapped_ts, timestamps) { + let wall_clock_elapsed = timestamps + .instant() + .elapsed() + .saturating_sub(total_pause_duration); + + let raw_duration = match anomaly_tracker.process_timestamp(remapped_ts, timestamps, wall_clock_elapsed) { Ok(d) => d, Err(TimestampAnomalyError::TooManyConsecutiveAnomalies { count }) => { return Err(anyhow!( @@ -2088,9 +2108,6 @@ fn spawn_video_encoder, TVideo: V "Timeline resync detected (anomaly collapsed jump); wall-clock anchor covers the gap" ); } - - let raw_wall_clock = timestamps.instant().elapsed(); - let wall_clock_elapsed = raw_wall_clock.saturating_sub(total_pause_duration); let duration = drift_tracker.calculate_timestamp(raw_duration, wall_clock_elapsed); timestamp_span.record(duration); @@ -2174,21 +2191,25 @@ fn spawn_video_encoder, TVideo: V .saturating_sub(shared_pause.total_pause_duration()), ); - let raw_duration = - match anomaly_tracker.process_timestamp(remapped_ts, timestamps) { - Ok(d) => d, - Err(_) => { - warn!("Timestamp anomaly during drain, skipping frame"); - skipped += 1; - continue; - } - }; + let wall_clock_elapsed = timestamps + .instant() + .elapsed() + .saturating_sub(shared_pause.total_pause_duration()); + + let raw_duration = match anomaly_tracker.process_timestamp( + remapped_ts, + timestamps, + wall_clock_elapsed, + ) { + Ok(d) => d, + Err(_) => { + warn!("Timestamp anomaly during drain, skipping frame"); + skipped += 1; + continue; + } + }; let _ = anomaly_tracker.take_resync_flag(); - - let raw_wall_clock = timestamps.instant().elapsed(); - let total_pause = shared_pause.total_pause_duration(); - let wall_clock_elapsed = raw_wall_clock.saturating_sub(total_pause); let duration = drift_tracker.calculate_timestamp(raw_duration, wall_clock_elapsed); timestamp_span.record(duration); @@ -3848,7 +3869,9 @@ mod tests { for i in 0..10u64 { let ts = make_timestamp(timestamps, Duration::from_millis(i * 33)); - tracker.process_timestamp(ts, timestamps).unwrap(); + tracker + .process_timestamp(ts, timestamps, Duration::from_millis(i * 33)) + .unwrap(); } assert_eq!(tracker.anomaly_count, 0); @@ -3864,7 +3887,9 @@ mod tests { for i in 0..5u64 { let ts = make_timestamp(timestamps, Duration::from_millis(i * 33)); - tracker.process_timestamp(ts, timestamps).unwrap(); + tracker + .process_timestamp(ts, timestamps, Duration::from_millis(i * 33)) + .unwrap(); } assert_eq!(tracker.anomaly_count, 0); @@ -3872,7 +3897,9 @@ mod tests { tracker.last_valid_wall_clock = Instant::now().checked_sub(Duration::from_secs(3)); let jump_ts = make_timestamp(timestamps, Duration::from_millis(4 * 33 + 3000)); - tracker.process_timestamp(jump_ts, timestamps).unwrap(); + tracker + .process_timestamp(jump_ts, timestamps, Duration::from_millis(4 * 33)) + .unwrap(); assert_eq!(tracker.anomaly_count, 0); assert_eq!(tracker.wall_clock_confirmed_jumps, 1); @@ -3886,13 +3913,17 @@ mod tests { for i in 0..5u64 { let ts = make_timestamp(timestamps, Duration::from_millis(i * 33)); - tracker.process_timestamp(ts, timestamps).unwrap(); + tracker + .process_timestamp(ts, timestamps, Duration::from_millis(i * 33)) + .unwrap(); } assert_eq!(tracker.anomaly_count, 0); let jump_ts = make_timestamp(timestamps, Duration::from_millis(4 * 33 + 3000)); - tracker.process_timestamp(jump_ts, timestamps).unwrap(); + tracker + .process_timestamp(jump_ts, timestamps, Duration::from_millis(4 * 33)) + .unwrap(); assert_eq!(tracker.anomaly_count, 1); assert_eq!(tracker.wall_clock_confirmed_jumps, 0); @@ -3906,13 +3937,17 @@ mod tests { for i in 0..5u64 { let ts = make_timestamp(timestamps, Duration::from_millis(i * 33)); - tracker.process_timestamp(ts, timestamps).unwrap(); + tracker + .process_timestamp(ts, timestamps, Duration::from_millis(i * 33)) + .unwrap(); } tracker.last_valid_wall_clock = Instant::now().checked_sub(Duration::from_secs(3)); let jump_ts = make_timestamp(timestamps, Duration::from_millis(4 * 33 + 3000)); - let accepted = tracker.process_timestamp(jump_ts, timestamps).unwrap(); + let accepted = tracker + .process_timestamp(jump_ts, timestamps, Duration::from_millis(4 * 33)) + .unwrap(); // A wall-clock-confirmed jump is a real gap in frame delivery and // passes through unmodified — it is not a resync. @@ -3927,7 +3962,13 @@ mod tests { let next_ts = make_timestamp(timestamps, Duration::from_millis(4 * 33 + 3000 + 33 + 3000)); - tracker.process_timestamp(next_ts, timestamps).unwrap(); + tracker + .process_timestamp( + next_ts, + timestamps, + Duration::from_millis(4 * 33 + 3000 + 33), + ) + .unwrap(); assert!( tracker.take_resync_flag(), @@ -3944,23 +3985,35 @@ mod tests { for i in 0..3u64 { let ts = make_timestamp(timestamps, Duration::from_millis(i * 33)); - tracker.process_timestamp(ts, timestamps).unwrap(); + tracker + .process_timestamp(ts, timestamps, Duration::from_millis(i * 33)) + .unwrap(); } tracker.last_valid_wall_clock = Instant::now().checked_sub(Duration::from_secs(3)); let jump1 = make_timestamp(timestamps, Duration::from_millis(2 * 33 + 3000)); - tracker.process_timestamp(jump1, timestamps).unwrap(); + tracker + .process_timestamp(jump1, timestamps, Duration::from_millis(2 * 33)) + .unwrap(); tracker.take_resync_flag(); let normal = make_timestamp(timestamps, Duration::from_millis(2 * 33 + 3000 + 33)); - tracker.process_timestamp(normal, timestamps).unwrap(); + tracker + .process_timestamp( + normal, + timestamps, + Duration::from_millis(2 * 33 + 3000 + 33), + ) + .unwrap(); tracker.last_valid_wall_clock = Instant::now().checked_sub(Duration::from_secs(5)); let jump2 = make_timestamp(timestamps, Duration::from_millis(2 * 33 + 3000 + 66 + 5000)); - tracker.process_timestamp(jump2, timestamps).unwrap(); + tracker + .process_timestamp(jump2, timestamps, Duration::from_millis(2 * 33 + 3000 + 66)) + .unwrap(); assert_eq!(tracker.anomaly_count, 0); assert_eq!(tracker.wall_clock_confirmed_jumps, 2); @@ -3970,6 +4023,68 @@ mod tests { ); } + // A loaded encoder can drain the pre-gap backlog and the post-gap + // frame back-to-back, so the arrival-spacing heuristic sees no wall + // gap even though the capture timestamps carry a real >2s delivery + // gap. The timestamps staying at-or-behind the wall clock is what + // proves the gap real; collapsing it here permanently desynced any + // recording whose gap began before the drift anchor existed. + #[test] + fn bunched_real_gap_behind_wall_clock_is_accepted() { + let mut tracker = TimestampAnomalyTracker::new("test"); + let timestamps = make_timestamps(); + + // Frames 0..0.5s processed in a burst (arrival gaps ~0). + for i in 0..15u64 { + let ts = make_timestamp(timestamps, Duration::from_millis(i * 33)); + tracker + .process_timestamp(ts, timestamps, Duration::from_millis(i * 33)) + .unwrap(); + } + + // The post-gap frame arrives immediately after (bunched), but its + // timestamp (4.9s) is behind the wall clock (5.0s): a real gap. + let jump_ts = make_timestamp(timestamps, Duration::from_millis(4900)); + let accepted = tracker + .process_timestamp(jump_ts, timestamps, Duration::from_millis(5000)) + .unwrap(); + + assert!( + (accepted.as_secs_f64() - 4.9).abs() < 0.05, + "real gap must pass through, got {accepted:?}" + ); + assert_eq!(tracker.anomaly_count, 0); + assert_eq!(tracker.wall_clock_confirmed_jumps, 1); + } + + // The inverse case: a timestamp landing ahead of the wall clock can + // only be a source-clock glitch — no real frame is stamped in the + // future — so it must still be collapsed, bunched arrival or not. + #[test] + fn future_stamped_jump_is_still_collapsed() { + let mut tracker = TimestampAnomalyTracker::new("test"); + let timestamps = make_timestamps(); + + for i in 0..15u64 { + let ts = make_timestamp(timestamps, Duration::from_millis(i * 33)); + tracker + .process_timestamp(ts, timestamps, Duration::from_millis(i * 33)) + .unwrap(); + } + + let jump_ts = make_timestamp(timestamps, Duration::from_millis(4900)); + let collapsed = tracker + .process_timestamp(jump_ts, timestamps, Duration::from_millis(15 * 33)) + .unwrap(); + + assert!( + collapsed.as_secs_f64() < 1.0, + "future-stamped glitch must be collapsed, got {collapsed:?}" + ); + assert_eq!(tracker.anomaly_count, 1); + assert_eq!(tracker.wall_clock_confirmed_jumps, 0); + } + #[test] fn wall_clock_start_set_on_first_frame() { let mut tracker = TimestampAnomalyTracker::new("test"); @@ -3978,7 +4093,9 @@ mod tests { assert!(tracker.wall_clock_start.is_none()); let ts = make_timestamp(timestamps, Duration::ZERO); - tracker.process_timestamp(ts, timestamps).unwrap(); + tracker + .process_timestamp(ts, timestamps, Duration::ZERO) + .unwrap(); assert!(tracker.wall_clock_start.is_some()); } @@ -3990,13 +4107,17 @@ mod tests { for i in 0..3u64 { let ts = make_timestamp(timestamps, Duration::from_millis(i * 33)); - tracker.process_timestamp(ts, timestamps).unwrap(); + tracker + .process_timestamp(ts, timestamps, Duration::from_millis(i * 33)) + .unwrap(); } tracker.last_valid_wall_clock = Instant::now().checked_sub(Duration::from_secs(3)); let jump_ts = make_timestamp(timestamps, Duration::from_millis(2 * 33 + 3000)); - tracker.process_timestamp(jump_ts, timestamps).unwrap(); + tracker + .process_timestamp(jump_ts, timestamps, Duration::from_millis(2 * 33)) + .unwrap(); assert_eq!(tracker.wall_clock_confirmed_jumps, 1); assert_eq!(tracker.anomaly_count, 0); @@ -4017,7 +4138,9 @@ mod tests { ) -> f64 { let remapped = Timestamp::Instant(timestamps.instant() + Duration::from_secs_f64(source_secs)); - let raw = anomaly.process_timestamp(remapped, timestamps).unwrap(); + let raw = anomaly + .process_timestamp(remapped, timestamps, Duration::from_secs_f64(wall_secs)) + .unwrap(); let _ = anomaly.take_resync_flag(); drift .calculate_timestamp(raw, Duration::from_secs_f64(wall_secs)) diff --git a/crates/recording/tests/sync_matrix.rs b/crates/recording/tests/sync_matrix.rs index d7b08a3d194..431b511de26 100644 --- a/crates/recording/tests/sync_matrix.rs +++ b/crates/recording/tests/sync_matrix.rs @@ -29,9 +29,10 @@ use cap_timestamp::{Timestamp, Timestamps}; use serde::Serialize; const CONTENT_SECS: f64 = 4.0; -/// Absolute tolerance for a muxed pts vs the sent capture timestamp. Covers -/// warmup anchoring, emission jitter and encoder rounding, plus scheduler -/// noise on shared CI runners. +/// Absolute tolerance for a muxed pts vs the sent capture timestamp, +/// measured from each side's own origin (first sent frame vs first muxed +/// pts). Covers warmup anchoring, emission jitter and encoder rounding, +/// plus scheduler noise on shared CI runners. const ABS_TOLERANCE_SECS: f64 = 0.25; /// Tolerance for the relative structure (pts deltas vs sent deltas), which is /// what actually determines sync drift. The bug class this guards against @@ -304,9 +305,18 @@ async fn run_video_case(case: VideoCase) -> Result { // floor in frames as well. The bug class this guards produces errors of // a second or more either way. let rel_tolerance = REL_TOLERANCE_SECS.max(2.5 / f64::from(case.fps)); + // The muxed timeline's origin is the first DELIVERED frame: the pipeline + // zeroes each track at its first frame and the recorder persists the + // track's start_time for cross-track alignment. A random case whose + // leading frames were dropped therefore legitimately muxes pts starting + // near 0, not at sent[0]; compare against the origin-normalized sent + // timeline. The fixed absolute bound stays valid at any frame rate the + // generator produces (fps >= 10 keeps rel_tolerance <= ABS_TOLERANCE): + // a constant whole-track offset does not scale with frame period. + let sent_origin = sent[0]; for (i, (&p, &s)) in pts.iter().zip(&sent).enumerate() { - max_abs = max_abs.max((p - s).abs()); - let rel = ((p - pts[0]) - (s - sent[0])).abs(); + max_abs = max_abs.max((p - (s - sent_origin)).abs()); + let rel = ((p - pts[0]) - (s - sent_origin)).abs(); max_rel = max_rel.max(rel); if rel > rel_tolerance { return Err(format!( diff --git a/crates/rendering/src/cursor_interpolation.rs b/crates/rendering/src/cursor_interpolation.rs index 0226a440108..9275c93446f 100644 --- a/crates/rendering/src/cursor_interpolation.rs +++ b/crates/rendering/src/cursor_interpolation.rs @@ -15,6 +15,21 @@ const DECIMATE_FPS: f64 = 60.0; const DECIMATE_MIN_DIST_UV: f64 = 1.0 / 1920.0; const SIMULATION_STEP_MS: f64 = 1000.0 / 60.0; const SPRING_SETTLE_EXTRA_MS: f64 = 300.0; +/// Per-step one-pole coefficient for adapting the phase lead when the active +/// spring profile changes; ~130ms time constant at the 60Hz simulation step, +/// so profile switches ease in without popping the target. +const LEAD_SMOOTHING: f64 = 0.12; + +/// A spring-mass-damper chasing a moving target trails it by friction/tension +/// seconds at steady state (independent of mass). Sampling the target that far +/// ahead cancels the trail so the smoothed cursor sits where the real cursor +/// was at render time instead of visibly lagging behind the video. +fn spring_lag_ms(config: &SpringMassDamperSimulationConfig) -> f64 { + if config.tension <= 0.0 { + return 0.0; + } + f64::from(config.friction / config.tension) * 1000.0 +} const DEFAULT_CLICK_SPRING: SpringMassDamperSimulationConfig = SpringMassDamperSimulationConfig { tension: 530.0, @@ -169,6 +184,9 @@ fn position_at_time_hinted( time_ms: f64, hint: &mut usize, ) -> (f64, f64) { + while *hint > 0 && moves[*hint].time_ms > time_ms { + *hint -= 1; + } while *hint + 1 < moves.len() && moves[*hint + 1].time_ms <= time_ms { *hint += 1; } @@ -362,7 +380,8 @@ fn build_smoothed_timeline( let capacity = ((settle_end / SIMULATION_STEP_MS).ceil() as usize) + 2; let mut events = Vec::with_capacity(capacity); - let mut move_hint: usize = 0; + let mut target_hint: usize = 0; + let mut cid_hint: usize = 0; events.push(SmoothedCursorEvent { time: 0.0, @@ -372,12 +391,23 @@ fn build_smoothed_timeline( }); let mut t_ms = SIMULATION_STEP_MS; + let mut lead_ms = spring_lag_ms(&smoothing_config); while t_ms <= settle_end { let clamped_t = t_ms.min(end_time_ms); - let (cx, cy) = position_at_time_hinted(moves, clamped_t, &mut move_hint); - let cid = cursor_id_at_time(moves, clamped_t, move_hint).to_string(); + context.advance_to(t_ms); + let config = presets.config(context.profile(t_ms)); + sim.set_config(config); + lead_ms += (spring_lag_ms(&config) - lead_ms) * LEAD_SMOOTHING; + + // The spring's target leads the raw path by the profile's own lag so + // the smoothed output lands on the real cursor position at time t. + // The drawn cursor icon must not lead: it samples the raw timeline. + let lead_t = (clamped_t + lead_ms).min(end_time_ms); + let (cx, cy) = position_at_time_hinted(moves, lead_t, &mut target_hint); + let _ = position_at_time_hinted(moves, clamped_t, &mut cid_hint); + let cid = cursor_id_at_time(moves, clamped_t, cid_hint).to_string(); let target = if let Some(click) = next_click_within(&cursor.clicks, t_ms, CLICK_LOOKAHEAD_TARGET_MS) @@ -390,9 +420,6 @@ fn build_smoothed_timeline( sim.set_target_position(target); - context.advance_to(t_ms); - sim.set_config(presets.config(context.profile(t_ms))); - sim.run(SIMULATION_STEP_MS as f32); events.push(SmoothedCursorEvent { @@ -660,6 +687,50 @@ mod tests { assert_eq!(context.profile(100.0), SpringProfile::Default); } + #[test] + fn smoothed_cursor_tracks_moving_target_without_lag() { + // Constant-velocity motion: 0.2 UV/s along x for 3 seconds, sampled + // every 10ms like the real recorder. + let velocity_uv_per_ms = 0.0002; + let moves: Vec<_> = (0..=300) + .map(|i| { + let t = i as f64 * 10.0; + cursor_move(t, 0.1 + t * velocity_uv_per_ms, 0.5) + }) + .collect(); + let cursor = CursorEvents { + moves, + clicks: vec![], + }; + + let smoothing = SpringMassDamperSimulationConfig { + tension: 470.0, + mass: 3.0, + friction: 70.0, + }; + + // Without phase-lead compensation the spring trails a moving target + // by friction/tension = 149ms, i.e. ~0.030 UV at this velocity. The + // compensated output must sit within a couple of simulation steps of + // the true position throughout steady-state motion. + for t_ms in [1000.0f64, 1500.0, 2000.0, 2500.0] { + let smoothed = interpolate_cursor_with_click_spring( + &cursor, + (t_ms / 1000.0) as f32, + Some(smoothing), + None, + ) + .unwrap(); + let expected_x = 0.1 + t_ms * velocity_uv_per_ms; + let err = (smoothed.position.coord.x - expected_x).abs(); + assert!( + err < 0.01, + "smoothed cursor off by {err:.4} UV ({:.0}ms of motion) at t={t_ms}ms", + err / velocity_uv_per_ms + ); + } + } + #[test] fn smoothed_timeline_has_no_jumps() { let moves = vec![