Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/relay/handler_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (h *sessionHandler) propagateNewGroupUpstream(
return
}

dynamic, err := trackSupportsDynamicGroups(entry.GetProperties())
dynamic, err := entry.DynamicGroups()
if err != nil {
// §12.6: a DYNAMIC_GROUPS value > 1 is a protocol violation by the
// upstream publisher. Scope the failure to declining the request
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package relay
package registry_test

import (
"testing"
Expand All @@ -15,36 +15,54 @@ func dynamicGroupsProps(t *testing.T, value uint64) []byte {
})
}

func TestTrackSupportsDynamicGroups(t *testing.T) {
// TestTrackEntry_DynamicGroups pins the §12.6 DYNAMIC_GROUPS decode that
// SetProperties performs once and caches: an absent property and value 0 are
// false, value 1 is true, and value > 1 is a PROTOCOL_VIOLATION surfaced as an
// error.
func TestTrackEntry_DynamicGroups(t *testing.T) {
t.Parallel()

setProps := func(props []byte) *registry.TrackEntry {
e := &registry.TrackEntry{}
e.SetProperties(props)
return e
}

t.Run("absent is false", func(t *testing.T) {
t.Parallel()
got, err := trackSupportsDynamicGroups(nil)
got, err := setProps(nil).DynamicGroups()
if err != nil || got {
t.Fatalf("got (%v, %v), want (false, nil)", got, err)
}
})
t.Run("value 0 is false", func(t *testing.T) {
t.Parallel()
got, err := trackSupportsDynamicGroups(dynamicGroupsProps(t, 0))
got, err := setProps(dynamicGroupsProps(t, 0)).DynamicGroups()
if err != nil || got {
t.Fatalf("got (%v, %v), want (false, nil)", got, err)
}
})
t.Run("value 1 is true", func(t *testing.T) {
t.Parallel()
got, err := trackSupportsDynamicGroups(dynamicGroupsProps(t, 1))
got, err := setProps(dynamicGroupsProps(t, 1)).DynamicGroups()
if err != nil || !got {
t.Fatalf("got (%v, %v), want (true, nil)", got, err)
}
})
t.Run("value > 1 is an error (§12.6)", func(t *testing.T) {
t.Parallel()
if _, err := trackSupportsDynamicGroups(dynamicGroupsProps(t, 2)); err == nil {
if _, err := setProps(dynamicGroupsProps(t, 2)).DynamicGroups(); err == nil {
t.Fatal("got nil error, want §12.6 protocol-violation error")
}
})
t.Run("malformed block is an error", func(t *testing.T) {
t.Parallel()
// 0x40 is the first byte of a 2-byte varint with no second byte, so
// the Properties block fails to parse structurally.
if _, err := setProps([]byte{0x40}).DynamicGroups(); err == nil {
t.Fatal("got nil error, want a structural parse error")
}
})
}

// TestConsiderNewGroupRequest pins the §10.2.13 relay decision and its
Expand Down
79 changes: 79 additions & 0 deletions pkg/relay/internal/registry/properties.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package registry

import (
"fmt"

"github.com/floatdrop/moq-go/pkg/moqt/message"
)

// decodedProperties holds the Track Properties the relay acts on, as opposed
// to the raw Properties block it forwards opaquely downstream per §9.6. The
// values are decoded once when an entry's Properties are set (see
// [TrackEntry.setPropertiesLocked]) so the §10.2.13 / §12 hot paths read a
// cached field instead of re-walking the block.
//
// To cache another property: add a field here, a branch in
// [decodeTrackProperties], and an accessor on [TrackEntry]. The raw block is
// still parsed only once, so a new property costs a branch, not a second pass
// over the bytes.
type decodedProperties struct {
// parseErr is a structural failure parsing the raw block (a malformed
// upstream Properties field). It is nil for a well-formed block. When
// set, no field below is meaningful, so every accessor reports it.
parseErr error

// dynamicGroups is DYNAMIC_GROUPS=1 (§12.6). dynamicGroupsErr is a §12.6
// PROTOCOL_VIOLATION (a DYNAMIC_GROUPS value > 1) by the upstream
// publisher.
dynamicGroups bool
dynamicGroupsErr error
}

// decodeTrackProperties parses the raw Track Properties block once and pulls
// out the fields the relay acts on. A structural parse failure short-circuits
// to a parseErr that every accessor surfaces; per-property value violations
// (e.g. §12.6) are recorded on the matching field's error.
func decodeTrackProperties(raw []byte) decodedProperties {
pairs, err := message.ParseTrackProperties(raw)
if err != nil {
return decodedProperties{parseErr: err}
}
var d decodedProperties
for _, kv := range pairs {
// Dispatch each property the relay acts on to its decoder. Add a
// branch here (turning this into a switch) for each new property.
if kv.Type == message.PropertyDynamicGroups {
d.dynamicGroups, d.dynamicGroupsErr = decodeDynamicGroups(kv.IntVal)
}
}
return d
}

// decodeDynamicGroups interprets a DYNAMIC_GROUPS value (§12.6): 0 is false,
// 1 is true, and anything greater is a PROTOCOL_VIOLATION so the caller can
// decline to act on it.
func decodeDynamicGroups(v uint64) (bool, error) {
switch v {
case 0:
return false, nil
case 1:
return true, nil
default:
return false, fmt.Errorf(
"relay: DYNAMIC_GROUPS value %d > 1 (§12.6 PROTOCOL_VIOLATION)", v)
}
}

// DynamicGroups reports whether the track advertised DYNAMIC_GROUPS=1 (§12.6),
// using the value decoded once when Properties was set. The error is a §12.6
// PROTOCOL_VIOLATION (a DYNAMIC_GROUPS value > 1), or a structural failure
// parsing the Properties block; either way the §10.2.13 caller declines the
// NEW_GROUP_REQUEST rather than acting on it.
func (e *TrackEntry) DynamicGroups() (bool, error) {
e.mu.RLock()
defer e.mu.RUnlock()
if e.decoded.parseErr != nil {
return false, e.decoded.parseErr
}
return e.decoded.dynamicGroups, e.decoded.dynamicGroupsErr
}
20 changes: 18 additions & 2 deletions pkg/relay/internal/registry/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ type TrackEntry struct {
// block; the relay treats them opaquely.
Properties []byte

// decoded holds the Track Properties the relay acts on, extracted once
// from the raw Properties block (which is otherwise forwarded opaquely
// per §9.6). Properties are immutable for the entry's lifetime (§9.6,
// first-setter-wins), so decoding happens once when Properties is set —
// see [decodeTrackProperties] for how to add a field. Set together with
// Properties by setPropertiesLocked.
decoded decodedProperties

// LargestObject is the (Group, Object) high-water mark observed for
// this track, updated by the fanout path on every incoming object and
// by upstream control messages that carry a LARGEST_OBJECT value. §10.2.11
Expand Down Expand Up @@ -547,7 +555,7 @@ func (r *TrackRegistry) AddUpstream(
// captured by a prior caller — §9.6 expects them to be
// stable for the lifetime of the track entry, so the first
// setter wins.
entry.Properties = conf.properties
entry.setPropertiesLocked(conf.properties)
}
if becameNonEmpty {
r.publishTrackToDiscovery(entry)
Expand Down Expand Up @@ -978,10 +986,18 @@ func (e *TrackEntry) UpdateLargestAndDetectNew(
// be replayed verbatim.)
func (e *TrackEntry) SetProperties(props []byte) {
e.mu.Lock()
e.Properties = props
e.setPropertiesLocked(props)
e.mu.Unlock()
}

// setPropertiesLocked stores the raw Properties bytes and decodes the fields
// the relay acts on in the same step, so the decoded values never drift from
// the raw bytes. Callers must hold e.mu.
func (e *TrackEntry) setPropertiesLocked(raw []byte) {
e.Properties = raw
e.decoded = decodeTrackProperties(raw)
}

// GetProperties returns the raw Track Properties captured from the upstream
// publisher. The returned slice is the same byte buffer stored on the entry;
// callers MUST NOT mutate it.
Expand Down
33 changes: 0 additions & 33 deletions pkg/relay/newgroup.go

This file was deleted.