diff --git a/pkg/relay/handler_subscribe.go b/pkg/relay/handler_subscribe.go index 5109aa8..ed0637e 100644 --- a/pkg/relay/handler_subscribe.go +++ b/pkg/relay/handler_subscribe.go @@ -283,7 +283,7 @@ func (h *sessionHandler) propagateNewGroupUpstream( return } - dynamic, err := trackSupportsDynamicGroups(entry.GetProperties()) + dynamic, err := entry.DynamicGroups() if err != nil { // §12.6: a DYNAMIC_GROUPS value > 1 is a protocol violation by the // upstream publisher. Scope the failure to declining the request diff --git a/pkg/relay/newgroup_internal_test.go b/pkg/relay/internal/registry/dynamic_groups_test.go similarity index 77% rename from pkg/relay/newgroup_internal_test.go rename to pkg/relay/internal/registry/dynamic_groups_test.go index 9d5b9eb..4f5b86d 100644 --- a/pkg/relay/newgroup_internal_test.go +++ b/pkg/relay/internal/registry/dynamic_groups_test.go @@ -1,4 +1,4 @@ -package relay +package registry_test import ( "testing" @@ -15,36 +15,54 @@ func dynamicGroupsProps(t *testing.T, value uint64) []byte { }) } -func TestTrackSupportsDynamicGroups(t *testing.T) { +// TestTrackEntry_DynamicGroups pins the §12.6 DYNAMIC_GROUPS decode that +// SetProperties performs once and caches: an absent property and value 0 are +// false, value 1 is true, and value > 1 is a PROTOCOL_VIOLATION surfaced as an +// error. +func TestTrackEntry_DynamicGroups(t *testing.T) { t.Parallel() + setProps := func(props []byte) *registry.TrackEntry { + e := ®istry.TrackEntry{} + e.SetProperties(props) + return e + } + t.Run("absent is false", func(t *testing.T) { t.Parallel() - got, err := trackSupportsDynamicGroups(nil) + got, err := setProps(nil).DynamicGroups() if err != nil || got { t.Fatalf("got (%v, %v), want (false, nil)", got, err) } }) t.Run("value 0 is false", func(t *testing.T) { t.Parallel() - got, err := trackSupportsDynamicGroups(dynamicGroupsProps(t, 0)) + got, err := setProps(dynamicGroupsProps(t, 0)).DynamicGroups() if err != nil || got { t.Fatalf("got (%v, %v), want (false, nil)", got, err) } }) t.Run("value 1 is true", func(t *testing.T) { t.Parallel() - got, err := trackSupportsDynamicGroups(dynamicGroupsProps(t, 1)) + got, err := setProps(dynamicGroupsProps(t, 1)).DynamicGroups() if err != nil || !got { t.Fatalf("got (%v, %v), want (true, nil)", got, err) } }) t.Run("value > 1 is an error (§12.6)", func(t *testing.T) { t.Parallel() - if _, err := trackSupportsDynamicGroups(dynamicGroupsProps(t, 2)); err == nil { + if _, err := setProps(dynamicGroupsProps(t, 2)).DynamicGroups(); err == nil { t.Fatal("got nil error, want §12.6 protocol-violation error") } }) + t.Run("malformed block is an error", func(t *testing.T) { + t.Parallel() + // 0x40 is the first byte of a 2-byte varint with no second byte, so + // the Properties block fails to parse structurally. + if _, err := setProps([]byte{0x40}).DynamicGroups(); err == nil { + t.Fatal("got nil error, want a structural parse error") + } + }) } // TestConsiderNewGroupRequest pins the §10.2.13 relay decision and its diff --git a/pkg/relay/internal/registry/properties.go b/pkg/relay/internal/registry/properties.go new file mode 100644 index 0000000..10a347c --- /dev/null +++ b/pkg/relay/internal/registry/properties.go @@ -0,0 +1,79 @@ +package registry + +import ( + "fmt" + + "github.com/floatdrop/moq-go/pkg/moqt/message" +) + +// decodedProperties holds the Track Properties the relay acts on, as opposed +// to the raw Properties block it forwards opaquely downstream per §9.6. The +// values are decoded once when an entry's Properties are set (see +// [TrackEntry.setPropertiesLocked]) so the §10.2.13 / §12 hot paths read a +// cached field instead of re-walking the block. +// +// To cache another property: add a field here, a branch in +// [decodeTrackProperties], and an accessor on [TrackEntry]. The raw block is +// still parsed only once, so a new property costs a branch, not a second pass +// over the bytes. +type decodedProperties struct { + // parseErr is a structural failure parsing the raw block (a malformed + // upstream Properties field). It is nil for a well-formed block. When + // set, no field below is meaningful, so every accessor reports it. + parseErr error + + // dynamicGroups is DYNAMIC_GROUPS=1 (§12.6). dynamicGroupsErr is a §12.6 + // PROTOCOL_VIOLATION (a DYNAMIC_GROUPS value > 1) by the upstream + // publisher. + dynamicGroups bool + dynamicGroupsErr error +} + +// decodeTrackProperties parses the raw Track Properties block once and pulls +// out the fields the relay acts on. A structural parse failure short-circuits +// to a parseErr that every accessor surfaces; per-property value violations +// (e.g. §12.6) are recorded on the matching field's error. +func decodeTrackProperties(raw []byte) decodedProperties { + pairs, err := message.ParseTrackProperties(raw) + if err != nil { + return decodedProperties{parseErr: err} + } + var d decodedProperties + for _, kv := range pairs { + // Dispatch each property the relay acts on to its decoder. Add a + // branch here (turning this into a switch) for each new property. + if kv.Type == message.PropertyDynamicGroups { + d.dynamicGroups, d.dynamicGroupsErr = decodeDynamicGroups(kv.IntVal) + } + } + return d +} + +// decodeDynamicGroups interprets a DYNAMIC_GROUPS value (§12.6): 0 is false, +// 1 is true, and anything greater is a PROTOCOL_VIOLATION so the caller can +// decline to act on it. +func decodeDynamicGroups(v uint64) (bool, error) { + switch v { + case 0: + return false, nil + case 1: + return true, nil + default: + return false, fmt.Errorf( + "relay: DYNAMIC_GROUPS value %d > 1 (§12.6 PROTOCOL_VIOLATION)", v) + } +} + +// DynamicGroups reports whether the track advertised DYNAMIC_GROUPS=1 (§12.6), +// using the value decoded once when Properties was set. The error is a §12.6 +// PROTOCOL_VIOLATION (a DYNAMIC_GROUPS value > 1), or a structural failure +// parsing the Properties block; either way the §10.2.13 caller declines the +// NEW_GROUP_REQUEST rather than acting on it. +func (e *TrackEntry) DynamicGroups() (bool, error) { + e.mu.RLock() + defer e.mu.RUnlock() + if e.decoded.parseErr != nil { + return false, e.decoded.parseErr + } + return e.decoded.dynamicGroups, e.decoded.dynamicGroupsErr +} diff --git a/pkg/relay/internal/registry/track.go b/pkg/relay/internal/registry/track.go index 51cd8d0..c788c1c 100644 --- a/pkg/relay/internal/registry/track.go +++ b/pkg/relay/internal/registry/track.go @@ -87,6 +87,14 @@ type TrackEntry struct { // block; the relay treats them opaquely. Properties []byte + // decoded holds the Track Properties the relay acts on, extracted once + // from the raw Properties block (which is otherwise forwarded opaquely + // per §9.6). Properties are immutable for the entry's lifetime (§9.6, + // first-setter-wins), so decoding happens once when Properties is set — + // see [decodeTrackProperties] for how to add a field. Set together with + // Properties by setPropertiesLocked. + decoded decodedProperties + // LargestObject is the (Group, Object) high-water mark observed for // this track, updated by the fanout path on every incoming object and // by upstream control messages that carry a LARGEST_OBJECT value. §10.2.11 @@ -547,7 +555,7 @@ func (r *TrackRegistry) AddUpstream( // captured by a prior caller — §9.6 expects them to be // stable for the lifetime of the track entry, so the first // setter wins. - entry.Properties = conf.properties + entry.setPropertiesLocked(conf.properties) } if becameNonEmpty { r.publishTrackToDiscovery(entry) @@ -978,10 +986,18 @@ func (e *TrackEntry) UpdateLargestAndDetectNew( // be replayed verbatim.) func (e *TrackEntry) SetProperties(props []byte) { e.mu.Lock() - e.Properties = props + e.setPropertiesLocked(props) e.mu.Unlock() } +// setPropertiesLocked stores the raw Properties bytes and decodes the fields +// the relay acts on in the same step, so the decoded values never drift from +// the raw bytes. Callers must hold e.mu. +func (e *TrackEntry) setPropertiesLocked(raw []byte) { + e.Properties = raw + e.decoded = decodeTrackProperties(raw) +} + // GetProperties returns the raw Track Properties captured from the upstream // publisher. The returned slice is the same byte buffer stored on the entry; // callers MUST NOT mutate it. diff --git a/pkg/relay/newgroup.go b/pkg/relay/newgroup.go deleted file mode 100644 index bec1859..0000000 --- a/pkg/relay/newgroup.go +++ /dev/null @@ -1,33 +0,0 @@ -package relay - -import ( - "fmt" - - "github.com/floatdrop/moq-go/pkg/moqt/message" -) - -// trackSupportsDynamicGroups parses raw Track Properties and reports whether -// the track advertised DYNAMIC_GROUPS=1 (§12.6). An absent property reports -// false. A value greater than 1 is a PROTOCOL_VIOLATION per §12.6 and surfaces -// as an error so the caller can decline to act on it. -func trackSupportsDynamicGroups(props []byte) (bool, error) { - pairs, err := message.ParseTrackProperties(props) - if err != nil { - return false, err - } - for _, kv := range pairs { - if kv.Type != message.PropertyDynamicGroups { - continue - } - switch kv.IntVal { - case 0: - return false, nil - case 1: - return true, nil - default: - return false, fmt.Errorf( - "relay: DYNAMIC_GROUPS value %d > 1 (§12.6 PROTOCOL_VIOLATION)", kv.IntVal) - } - } - return false, nil -}