-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathstream_turn_host.go
More file actions
94 lines (81 loc) · 2.67 KB
/
stream_turn_host.go
File metadata and controls
94 lines (81 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package agentremote
import "sync"
// Aborter is implemented by any value that can be aborted with a reason string.
// sdk.Turn satisfies this interface.
type Aborter interface {
Abort(reason string)
}
// StreamTurnHostCallbacks defines the bridge-specific hooks for StreamTurnHost.
type StreamTurnHostCallbacks[S any] struct {
// GetAborter returns the aborter (typically an *sdk.Turn) from the state, or nil.
GetAborter func(state *S) Aborter
}
// StreamTurnHost manages a map of stream states keyed by turn ID, providing
// thread-safe drain/abort and state cleanup helpers shared across bridges.
type StreamTurnHost[S any] struct {
mu sync.Mutex
states map[string]*S
callbacks StreamTurnHostCallbacks[S]
}
// NewStreamTurnHost creates a new StreamTurnHost.
func NewStreamTurnHost[S any](cb StreamTurnHostCallbacks[S]) *StreamTurnHost[S] {
return &StreamTurnHost[S]{
states: make(map[string]*S),
callbacks: cb,
}
}
// Lock acquires the host mutex.
func (h *StreamTurnHost[S]) Lock() { h.mu.Lock() }
// Unlock releases the host mutex.
func (h *StreamTurnHost[S]) Unlock() { h.mu.Unlock() }
// GetLocked returns the state for turnID. Must be called with the lock held.
func (h *StreamTurnHost[S]) GetLocked(turnID string) *S {
return h.states[turnID]
}
// SetLocked stores state for turnID. Must be called with the lock held.
func (h *StreamTurnHost[S]) SetLocked(turnID string, state *S) {
h.states[turnID] = state
}
// DeleteLocked removes a state entry. Must be called with the lock held.
func (h *StreamTurnHost[S]) DeleteLocked(turnID string) {
delete(h.states, turnID)
}
// DeleteIfMatch removes the entry only if it still points to the given state.
func (h *StreamTurnHost[S]) DeleteIfMatch(turnID string, state *S) {
h.mu.Lock()
if h.states[turnID] == state {
delete(h.states, turnID)
}
h.mu.Unlock()
}
// IsActive reports whether a turn ID has an active stream state.
func (h *StreamTurnHost[S]) IsActive(turnID string) bool {
h.mu.Lock()
defer h.mu.Unlock()
_, ok := h.states[turnID]
return ok
}
// DrainAndAbort collects all active turns, clears the map, and aborts each
// turn with the given reason. This is the standard disconnect cleanup path.
func (h *StreamTurnHost[S]) DrainAndAbort(reason string) {
h.mu.Lock()
states := make([]*S, 0, len(h.states))
for _, state := range h.states {
if state != nil {
states = append(states, state)
}
}
h.states = make(map[string]*S)
h.mu.Unlock()
aborters := make([]Aborter, 0, len(states))
for _, state := range states {
if h.callbacks.GetAborter != nil {
if a := h.callbacks.GetAborter(state); a != nil {
aborters = append(aborters, a)
}
}
}
for _, a := range aborters {
a.Abort(reason)
}
}