From d940361a8e904cc850eb84b0a5669424d94108d3 Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Fri, 26 Jun 2026 23:57:28 +0500 Subject: [PATCH 1/3] relay: decode DYNAMIC_GROUPS once at ingestion, not per request MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The §10.2.13 NEW_GROUP_REQUEST path called trackSupportsDynamicGroups on every request, which parsed the entire Track Properties block, pulled out the one DYNAMIC_GROUPS field, and discarded the rest. Track Properties are immutable for the entry's lifetime (§9.6, first-setter-wins), so the decode only needs to happen once. Decode DYNAMIC_GROUPS when Properties is set (centralized through a new setPropertiesLocked) and cache the (bool, error) on the TrackEntry, exposed via DynamicGroups(). The raw bytes are still stored verbatim for the §9.6 opaque downstream passthrough; only the one field the relay semantically acts on is now decoded eagerly. The §12.6 PROTOCOL_VIOLATION (DYNAMIC_GROUPS value > 1) is cached alongside so the caller's decline behavior is unchanged. trackSupportsDynamicGroups moves into registry as parseDynamicGroups, emptying newgroup.go (newGroupRequestValue was already inlined), so both it and newgroup_internal_test.go are deleted; their tests move to registry's dynamic_groups_test.go, with the dynamic-groups test rewritten to exercise the public SetProperties / DynamicGroups path. Co-Authored-By: Claude Opus 4.8 --- pkg/relay/handler_subscribe.go | 2 +- .../registry/dynamic_groups_test.go} | 22 +++++-- pkg/relay/internal/registry/track.go | 61 ++++++++++++++++++- pkg/relay/newgroup.go | 33 ---------- 4 files changed, 76 insertions(+), 42 deletions(-) rename pkg/relay/{newgroup_internal_test.go => internal/registry/dynamic_groups_test.go} (83%) delete mode 100644 pkg/relay/newgroup.go diff --git a/pkg/relay/handler_subscribe.go b/pkg/relay/handler_subscribe.go index 5109aa8..ed0637e 100644 --- a/pkg/relay/handler_subscribe.go +++ b/pkg/relay/handler_subscribe.go @@ -283,7 +283,7 @@ func (h *sessionHandler) propagateNewGroupUpstream( return } - dynamic, err := trackSupportsDynamicGroups(entry.GetProperties()) + dynamic, err := entry.DynamicGroups() if err != nil { // §12.6: a DYNAMIC_GROUPS value > 1 is a protocol violation by the // upstream publisher. Scope the failure to declining the request diff --git a/pkg/relay/newgroup_internal_test.go b/pkg/relay/internal/registry/dynamic_groups_test.go similarity index 83% rename from pkg/relay/newgroup_internal_test.go rename to pkg/relay/internal/registry/dynamic_groups_test.go index 9d5b9eb..ada1329 100644 --- a/pkg/relay/newgroup_internal_test.go +++ b/pkg/relay/internal/registry/dynamic_groups_test.go @@ -1,4 +1,4 @@ -package relay +package registry_test import ( "testing" @@ -15,33 +15,43 @@ func dynamicGroupsProps(t *testing.T, value uint64) []byte { }) } -func TestTrackSupportsDynamicGroups(t *testing.T) { +// TestTrackEntry_DynamicGroups pins the §12.6 DYNAMIC_GROUPS decode that +// SetProperties performs once and caches: an absent property and value 0 are +// false, value 1 is true, and value > 1 is a PROTOCOL_VIOLATION surfaced as an +// error. +func TestTrackEntry_DynamicGroups(t *testing.T) { t.Parallel() + setProps := func(props []byte) *registry.TrackEntry { + e := ®istry.TrackEntry{} + e.SetProperties(props) + return e + } + t.Run("absent is false", func(t *testing.T) { t.Parallel() - got, err := trackSupportsDynamicGroups(nil) + got, err := setProps(nil).DynamicGroups() if err != nil || got { t.Fatalf("got (%v, %v), want (false, nil)", got, err) } }) t.Run("value 0 is false", func(t *testing.T) { t.Parallel() - got, err := trackSupportsDynamicGroups(dynamicGroupsProps(t, 0)) + got, err := setProps(dynamicGroupsProps(t, 0)).DynamicGroups() if err != nil || got { t.Fatalf("got (%v, %v), want (false, nil)", got, err) } }) t.Run("value 1 is true", func(t *testing.T) { t.Parallel() - got, err := trackSupportsDynamicGroups(dynamicGroupsProps(t, 1)) + got, err := setProps(dynamicGroupsProps(t, 1)).DynamicGroups() if err != nil || !got { t.Fatalf("got (%v, %v), want (true, nil)", got, err) } }) t.Run("value > 1 is an error (§12.6)", func(t *testing.T) { t.Parallel() - if _, err := trackSupportsDynamicGroups(dynamicGroupsProps(t, 2)); err == nil { + if _, err := setProps(dynamicGroupsProps(t, 2)).DynamicGroups(); err == nil { t.Fatal("got nil error, want §12.6 protocol-violation error") } }) diff --git a/pkg/relay/internal/registry/track.go b/pkg/relay/internal/registry/track.go index 51cd8d0..02d80ae 100644 --- a/pkg/relay/internal/registry/track.go +++ b/pkg/relay/internal/registry/track.go @@ -14,6 +14,7 @@ package registry import ( "context" + "fmt" "log/slog" "slices" "sync" @@ -87,6 +88,17 @@ type TrackEntry struct { // block; the relay treats them opaquely. Properties []byte + // dynamicGroups caches the one Track Property the relay acts on rather + // than forwards: DYNAMIC_GROUPS (§12.6), needed by the §10.2.13 + // NEW_GROUP_REQUEST path. Properties are immutable for the entry's + // lifetime (§9.6, first-setter-wins), so the value is decoded once when + // Properties is set rather than re-parsing the whole block on every + // request. dynamicGroupsErr holds a §12.6 PROTOCOL_VIOLATION (a + // DYNAMIC_GROUPS value > 1) so the caller can decline the request. Both + // are set together by setPropertiesLocked. + dynamicGroups bool + dynamicGroupsErr error + // LargestObject is the (Group, Object) high-water mark observed for // this track, updated by the fanout path on every incoming object and // by upstream control messages that carry a LARGEST_OBJECT value. §10.2.11 @@ -547,7 +559,7 @@ func (r *TrackRegistry) AddUpstream( // captured by a prior caller — §9.6 expects them to be // stable for the lifetime of the track entry, so the first // setter wins. - entry.Properties = conf.properties + entry.setPropertiesLocked(conf.properties) } if becameNonEmpty { r.publishTrackToDiscovery(entry) @@ -978,10 +990,19 @@ func (e *TrackEntry) UpdateLargestAndDetectNew( // be replayed verbatim.) func (e *TrackEntry) SetProperties(props []byte) { e.mu.Lock() - e.Properties = props + e.setPropertiesLocked(props) e.mu.Unlock() } +// setPropertiesLocked stores the raw Properties bytes and decodes the one +// field the relay acts on (DYNAMIC_GROUPS) in the same step, so the cached +// dynamicGroups value never drifts from the raw bytes. Callers must hold +// e.mu. +func (e *TrackEntry) setPropertiesLocked(props []byte) { + e.Properties = props + e.dynamicGroups, e.dynamicGroupsErr = parseDynamicGroups(props) +} + // GetProperties returns the raw Track Properties captured from the upstream // publisher. The returned slice is the same byte buffer stored on the entry; // callers MUST NOT mutate it. @@ -991,6 +1012,42 @@ func (e *TrackEntry) GetProperties() []byte { return e.Properties } +// DynamicGroups reports whether the track advertised DYNAMIC_GROUPS=1 (§12.6), +// using the value decoded once when Properties was set. The error is a §12.6 +// PROTOCOL_VIOLATION (a DYNAMIC_GROUPS value > 1) by the upstream publisher; +// the §10.2.13 caller declines the NEW_GROUP_REQUEST rather than acting on it. +func (e *TrackEntry) DynamicGroups() (bool, error) { + e.mu.RLock() + defer e.mu.RUnlock() + return e.dynamicGroups, e.dynamicGroupsErr +} + +// parseDynamicGroups parses raw Track Properties and reports whether the track +// advertised DYNAMIC_GROUPS=1 (§12.6). An absent property reports false. A +// value greater than 1 is a PROTOCOL_VIOLATION per §12.6 and surfaces as an +// error so the caller can decline to act on it. +func parseDynamicGroups(props []byte) (bool, error) { + pairs, err := message.ParseTrackProperties(props) + if err != nil { + return false, err + } + for _, kv := range pairs { + if kv.Type != message.PropertyDynamicGroups { + continue + } + switch kv.IntVal { + case 0: + return false, nil + case 1: + return true, nil + default: + return false, fmt.Errorf( + "relay: DYNAMIC_GROUPS value %d > 1 (§12.6 PROTOCOL_VIOLATION)", kv.IntVal) + } + } + return false, nil +} + // CopyUpstream returns a snapshot of the current upstream slice. Callers // that want to iterate without holding the entry lock for the whole // iteration use this so they don't have to coordinate with mutators. diff --git a/pkg/relay/newgroup.go b/pkg/relay/newgroup.go deleted file mode 100644 index bec1859..0000000 --- a/pkg/relay/newgroup.go +++ /dev/null @@ -1,33 +0,0 @@ -package relay - -import ( - "fmt" - - "github.com/floatdrop/moq-go/pkg/moqt/message" -) - -// trackSupportsDynamicGroups parses raw Track Properties and reports whether -// the track advertised DYNAMIC_GROUPS=1 (§12.6). An absent property reports -// false. A value greater than 1 is a PROTOCOL_VIOLATION per §12.6 and surfaces -// as an error so the caller can decline to act on it. -func trackSupportsDynamicGroups(props []byte) (bool, error) { - pairs, err := message.ParseTrackProperties(props) - if err != nil { - return false, err - } - for _, kv := range pairs { - if kv.Type != message.PropertyDynamicGroups { - continue - } - switch kv.IntVal { - case 0: - return false, nil - case 1: - return true, nil - default: - return false, fmt.Errorf( - "relay: DYNAMIC_GROUPS value %d > 1 (§12.6 PROTOCOL_VIOLATION)", kv.IntVal) - } - } - return false, nil -} From 77c497d443e80a2f2f400ee9b7aac77f0af17969 Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Sat, 27 Jun 2026 00:03:21 +0500 Subject: [PATCH 2/3] relay: parse Track Properties block once, dispatch per property MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous parseDynamicGroups walked the whole Track Properties block to extract one field. Caching a second property that way would mean either a second full-block walk or a copy-pasted parse loop — an easy trap. Restructure so the block is parsed exactly once into a decodedProperties value: decodeTrackProperties walks the KV pairs and dispatches each property the relay acts on to a small per-value decoder. Adding a cached property is now a field + a switch case + an accessor, never a second pass over the bytes. A structural block-parse failure is recorded once as parseErr and surfaced by every accessor, so per-property error handling doesn't repeat it. Moves the decode into a dedicated registry/properties.go and adds a malformed-block test case alongside the existing §12.6 coverage. Co-Authored-By: Claude Opus 4.8 --- .../internal/registry/dynamic_groups_test.go | 8 ++ pkg/relay/internal/registry/properties.go | 78 +++++++++++++++++++ pkg/relay/internal/registry/track.go | 67 ++++------------ 3 files changed, 99 insertions(+), 54 deletions(-) create mode 100644 pkg/relay/internal/registry/properties.go diff --git a/pkg/relay/internal/registry/dynamic_groups_test.go b/pkg/relay/internal/registry/dynamic_groups_test.go index ada1329..4f5b86d 100644 --- a/pkg/relay/internal/registry/dynamic_groups_test.go +++ b/pkg/relay/internal/registry/dynamic_groups_test.go @@ -55,6 +55,14 @@ func TestTrackEntry_DynamicGroups(t *testing.T) { t.Fatal("got nil error, want §12.6 protocol-violation error") } }) + t.Run("malformed block is an error", func(t *testing.T) { + t.Parallel() + // 0x40 is the first byte of a 2-byte varint with no second byte, so + // the Properties block fails to parse structurally. + if _, err := setProps([]byte{0x40}).DynamicGroups(); err == nil { + t.Fatal("got nil error, want a structural parse error") + } + }) } // TestConsiderNewGroupRequest pins the §10.2.13 relay decision and its diff --git a/pkg/relay/internal/registry/properties.go b/pkg/relay/internal/registry/properties.go new file mode 100644 index 0000000..64fe1e9 --- /dev/null +++ b/pkg/relay/internal/registry/properties.go @@ -0,0 +1,78 @@ +package registry + +import ( + "fmt" + + "github.com/floatdrop/moq-go/pkg/moqt/message" +) + +// decodedProperties holds the Track Properties the relay acts on, as opposed +// to the raw Properties block it forwards opaquely downstream per §9.6. The +// values are decoded once when an entry's Properties are set (see +// [TrackEntry.setPropertiesLocked]) so the §10.2.13 / §12 hot paths read a +// cached field instead of re-walking the block. +// +// To cache another property: add a field here, a case in +// [decodeTrackProperties], and an accessor on [TrackEntry]. The raw block is +// still parsed only once, so a new property costs a switch case, not a second +// pass over the bytes. +type decodedProperties struct { + // parseErr is a structural failure parsing the raw block (a malformed + // upstream Properties field). It is nil for a well-formed block. When + // set, no field below is meaningful, so every accessor reports it. + parseErr error + + // dynamicGroups is DYNAMIC_GROUPS=1 (§12.6). dynamicGroupsErr is a §12.6 + // PROTOCOL_VIOLATION (a DYNAMIC_GROUPS value > 1) by the upstream + // publisher. + dynamicGroups bool + dynamicGroupsErr error +} + +// decodeTrackProperties parses the raw Track Properties block once and pulls +// out the fields the relay acts on. A structural parse failure short-circuits +// to a parseErr that every accessor surfaces; per-property value violations +// (e.g. §12.6) are recorded on the matching field's error. +func decodeTrackProperties(raw []byte) decodedProperties { + pairs, err := message.ParseTrackProperties(raw) + if err != nil { + return decodedProperties{parseErr: err} + } + var d decodedProperties + for _, kv := range pairs { + switch kv.Type { + case message.PropertyDynamicGroups: + d.dynamicGroups, d.dynamicGroupsErr = decodeDynamicGroups(kv.IntVal) + } + } + return d +} + +// decodeDynamicGroups interprets a DYNAMIC_GROUPS value (§12.6): 0 is false, +// 1 is true, and anything greater is a PROTOCOL_VIOLATION so the caller can +// decline to act on it. +func decodeDynamicGroups(v uint64) (bool, error) { + switch v { + case 0: + return false, nil + case 1: + return true, nil + default: + return false, fmt.Errorf( + "relay: DYNAMIC_GROUPS value %d > 1 (§12.6 PROTOCOL_VIOLATION)", v) + } +} + +// DynamicGroups reports whether the track advertised DYNAMIC_GROUPS=1 (§12.6), +// using the value decoded once when Properties was set. The error is a §12.6 +// PROTOCOL_VIOLATION (a DYNAMIC_GROUPS value > 1), or a structural failure +// parsing the Properties block; either way the §10.2.13 caller declines the +// NEW_GROUP_REQUEST rather than acting on it. +func (e *TrackEntry) DynamicGroups() (bool, error) { + e.mu.RLock() + defer e.mu.RUnlock() + if e.decoded.parseErr != nil { + return false, e.decoded.parseErr + } + return e.decoded.dynamicGroups, e.decoded.dynamicGroupsErr +} diff --git a/pkg/relay/internal/registry/track.go b/pkg/relay/internal/registry/track.go index 02d80ae..c788c1c 100644 --- a/pkg/relay/internal/registry/track.go +++ b/pkg/relay/internal/registry/track.go @@ -14,7 +14,6 @@ package registry import ( "context" - "fmt" "log/slog" "slices" "sync" @@ -88,16 +87,13 @@ type TrackEntry struct { // block; the relay treats them opaquely. Properties []byte - // dynamicGroups caches the one Track Property the relay acts on rather - // than forwards: DYNAMIC_GROUPS (§12.6), needed by the §10.2.13 - // NEW_GROUP_REQUEST path. Properties are immutable for the entry's - // lifetime (§9.6, first-setter-wins), so the value is decoded once when - // Properties is set rather than re-parsing the whole block on every - // request. dynamicGroupsErr holds a §12.6 PROTOCOL_VIOLATION (a - // DYNAMIC_GROUPS value > 1) so the caller can decline the request. Both - // are set together by setPropertiesLocked. - dynamicGroups bool - dynamicGroupsErr error + // decoded holds the Track Properties the relay acts on, extracted once + // from the raw Properties block (which is otherwise forwarded opaquely + // per §9.6). Properties are immutable for the entry's lifetime (§9.6, + // first-setter-wins), so decoding happens once when Properties is set — + // see [decodeTrackProperties] for how to add a field. Set together with + // Properties by setPropertiesLocked. + decoded decodedProperties // LargestObject is the (Group, Object) high-water mark observed for // this track, updated by the fanout path on every incoming object and @@ -994,13 +990,12 @@ func (e *TrackEntry) SetProperties(props []byte) { e.mu.Unlock() } -// setPropertiesLocked stores the raw Properties bytes and decodes the one -// field the relay acts on (DYNAMIC_GROUPS) in the same step, so the cached -// dynamicGroups value never drifts from the raw bytes. Callers must hold -// e.mu. -func (e *TrackEntry) setPropertiesLocked(props []byte) { - e.Properties = props - e.dynamicGroups, e.dynamicGroupsErr = parseDynamicGroups(props) +// setPropertiesLocked stores the raw Properties bytes and decodes the fields +// the relay acts on in the same step, so the decoded values never drift from +// the raw bytes. Callers must hold e.mu. +func (e *TrackEntry) setPropertiesLocked(raw []byte) { + e.Properties = raw + e.decoded = decodeTrackProperties(raw) } // GetProperties returns the raw Track Properties captured from the upstream @@ -1012,42 +1007,6 @@ func (e *TrackEntry) GetProperties() []byte { return e.Properties } -// DynamicGroups reports whether the track advertised DYNAMIC_GROUPS=1 (§12.6), -// using the value decoded once when Properties was set. The error is a §12.6 -// PROTOCOL_VIOLATION (a DYNAMIC_GROUPS value > 1) by the upstream publisher; -// the §10.2.13 caller declines the NEW_GROUP_REQUEST rather than acting on it. -func (e *TrackEntry) DynamicGroups() (bool, error) { - e.mu.RLock() - defer e.mu.RUnlock() - return e.dynamicGroups, e.dynamicGroupsErr -} - -// parseDynamicGroups parses raw Track Properties and reports whether the track -// advertised DYNAMIC_GROUPS=1 (§12.6). An absent property reports false. A -// value greater than 1 is a PROTOCOL_VIOLATION per §12.6 and surfaces as an -// error so the caller can decline to act on it. -func parseDynamicGroups(props []byte) (bool, error) { - pairs, err := message.ParseTrackProperties(props) - if err != nil { - return false, err - } - for _, kv := range pairs { - if kv.Type != message.PropertyDynamicGroups { - continue - } - switch kv.IntVal { - case 0: - return false, nil - case 1: - return true, nil - default: - return false, fmt.Errorf( - "relay: DYNAMIC_GROUPS value %d > 1 (§12.6 PROTOCOL_VIOLATION)", kv.IntVal) - } - } - return false, nil -} - // CopyUpstream returns a snapshot of the current upstream slice. Callers // that want to iterate without holding the entry lock for the whole // iteration use this so they don't have to coordinate with mutators. From 441e3ef9fca92ade47a21fc70ed9f2ec1e270385 Mon Sep 17 00:00:00 2001 From: Vsevolod Strukchinsky Date: Sat, 27 Jun 2026 00:07:55 +0500 Subject: [PATCH 3/3] relay: use if over single-case switch in decodeTrackProperties gocritic's singleCaseSwitch flags the dispatch switch while only one property is decoded. Rewrite as an if; the comment notes it becomes a switch once a second property is added. Co-Authored-By: Claude Opus 4.8 --- pkg/relay/internal/registry/properties.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/relay/internal/registry/properties.go b/pkg/relay/internal/registry/properties.go index 64fe1e9..10a347c 100644 --- a/pkg/relay/internal/registry/properties.go +++ b/pkg/relay/internal/registry/properties.go @@ -12,10 +12,10 @@ import ( // [TrackEntry.setPropertiesLocked]) so the §10.2.13 / §12 hot paths read a // cached field instead of re-walking the block. // -// To cache another property: add a field here, a case in +// To cache another property: add a field here, a branch in // [decodeTrackProperties], and an accessor on [TrackEntry]. The raw block is -// still parsed only once, so a new property costs a switch case, not a second -// pass over the bytes. +// still parsed only once, so a new property costs a branch, not a second pass +// over the bytes. type decodedProperties struct { // parseErr is a structural failure parsing the raw block (a malformed // upstream Properties field). It is nil for a well-formed block. When @@ -40,8 +40,9 @@ func decodeTrackProperties(raw []byte) decodedProperties { } var d decodedProperties for _, kv := range pairs { - switch kv.Type { - case message.PropertyDynamicGroups: + // Dispatch each property the relay acts on to its decoder. Add a + // branch here (turning this into a switch) for each new property. + if kv.Type == message.PropertyDynamicGroups { d.dynamicGroups, d.dynamicGroupsErr = decodeDynamicGroups(kv.IntVal) } }