Skip to content

Commit 401ab97

Browse files
fix: restore events tail streaming (#209)
1 parent 5fa9e00 commit 401ab97

File tree

4 files changed

+132
-6
lines changed

4 files changed

+132
-6
lines changed

api/insights/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func NewClient(appID, apiKey string, region algoliaInsights.Region) (*Client, er
3131
ApiKey: apiKey,
3232
UserAgent: userAgent,
3333
},
34+
Region: region,
3435
}
3536
client, err := algoliaInsights.NewClientWithConfig(clientConfig)
3637
if err != nil {

api/insights/client_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package insights
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
7+
algoliaInsights "github.com/algolia/algoliasearch-client-go/v4/algolia/insights"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestNewClientSetsRequestedRegion(t *testing.T) {
12+
client, err := NewClient("test-app-id", "test-api-key", algoliaInsights.DE)
13+
require.NoError(t, err)
14+
15+
cfg := client.GetConfiguration()
16+
require.Equal(t, algoliaInsights.DE, cfg.Region)
17+
require.NotEmpty(t, cfg.Hosts)
18+
19+
host := reflect.ValueOf(cfg.Hosts[0]).FieldByName("host").String()
20+
require.Equal(t, "insights.de.algolia.io", host)
21+
}

pkg/cmd/events/tail/tail.go

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,15 @@ func runTailCmd(opts *TailOptions) error {
113113
fmt.Fprint(opts.IO.Out, "\nWaiting for events... Press Ctrl+C to stop.\n")
114114
}
115115

116-
c := time.Tick(Interval)
117-
for t := range c {
118-
utc := t.UTC()
119-
events, err := client.GetEvents(utc.Add(-1*time.Second), utc, 1000)
116+
ticker := time.NewTicker(Interval)
117+
defer ticker.Stop()
118+
119+
windowStart := time.Now().UTC().Add(-Interval)
120+
seenRequestIDs := map[string]time.Time{}
121+
122+
for {
123+
windowEnd := time.Now().UTC()
124+
events, err := client.GetEvents(windowStart, windowEnd, 1000)
120125
if err != nil {
121126
if strings.Contains(err.Error(), "The log processing region does not match") {
122127
cs := opts.IO.ColorScheme()
@@ -127,9 +132,12 @@ func runTailCmd(opts *TailOptions) error {
127132
`, cs.FailureIcon(), opts.Region)
128133
return errors.New(errDetails)
129134
}
135+
136+
return err
130137
}
131138

132-
for _, event := range events.Events {
139+
pruneSeenRequestIDs(seenRequestIDs, windowStart.Add(-Interval))
140+
for _, event := range unseenEvents(events.Events, seenRequestIDs, windowEnd) {
133141
if p != nil {
134142
if err := p.Print(opts.IO, event); err != nil {
135143
return err
@@ -140,9 +148,35 @@ func runTailCmd(opts *TailOptions) error {
140148
}
141149
}
142150
}
151+
152+
windowStart = windowEnd.Add(-Interval)
153+
<-ticker.C
143154
}
155+
}
144156

145-
return nil
157+
func unseenEvents(events []insights.EventWrapper, seenRequestIDs map[string]time.Time, seenAt time.Time) []insights.EventWrapper {
158+
freshEvents := make([]insights.EventWrapper, 0, len(events))
159+
for _, event := range events {
160+
requestID := event.RequestID
161+
if requestID != "" {
162+
if _, ok := seenRequestIDs[requestID]; ok {
163+
continue
164+
}
165+
seenRequestIDs[requestID] = seenAt
166+
}
167+
168+
freshEvents = append(freshEvents, event)
169+
}
170+
171+
return freshEvents
172+
}
173+
174+
func pruneSeenRequestIDs(seenRequestIDs map[string]time.Time, cutoff time.Time) {
175+
for requestID, timestamp := range seenRequestIDs {
176+
if timestamp.Before(cutoff) {
177+
delete(seenRequestIDs, requestID)
178+
}
179+
}
146180
}
147181

148182
func printEvent(io *iostreams.IOStreams, event insights.EventWrapper) error {

pkg/cmd/events/tail/tail_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package tail
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/algolia/cli/api/insights"
10+
)
11+
12+
func TestUnseenEventsSkipsDuplicateRequestIDs(t *testing.T) {
13+
now := time.Now().UTC()
14+
seenAt := now.Add(10 * time.Second)
15+
seenRequestIDs := map[string]time.Time{}
16+
events := []insights.EventWrapper{
17+
{
18+
RequestID: "req-1",
19+
Event: insights.Event{
20+
EventName: "first",
21+
Timestamp: insights.Timestamp{Time: now},
22+
},
23+
},
24+
{
25+
RequestID: "req-1",
26+
Event: insights.Event{
27+
EventName: "duplicate",
28+
Timestamp: insights.Timestamp{Time: now.Add(time.Second)},
29+
},
30+
},
31+
{
32+
RequestID: "req-2",
33+
Event: insights.Event{
34+
EventName: "second",
35+
Timestamp: insights.Timestamp{Time: now.Add(2 * time.Second)},
36+
},
37+
},
38+
}
39+
40+
freshEvents := unseenEvents(events, seenRequestIDs, seenAt)
41+
42+
require.Len(t, freshEvents, 2)
43+
require.Equal(t, "first", freshEvents[0].Event.EventName)
44+
require.Equal(t, "second", freshEvents[1].Event.EventName)
45+
require.Equal(t, seenAt, seenRequestIDs["req-1"])
46+
require.Equal(t, seenAt, seenRequestIDs["req-2"])
47+
}
48+
49+
func TestUnseenEventsKeepsEventsWithoutRequestID(t *testing.T) {
50+
seenAt := time.Now().UTC()
51+
freshEvents := unseenEvents([]insights.EventWrapper{
52+
{Event: insights.Event{EventName: "first"}},
53+
{Event: insights.Event{EventName: "second"}},
54+
}, map[string]time.Time{}, seenAt)
55+
56+
require.Len(t, freshEvents, 2)
57+
}
58+
59+
func TestPruneSeenRequestIDsRemovesOldEntries(t *testing.T) {
60+
now := time.Now().UTC()
61+
seenRequestIDs := map[string]time.Time{
62+
"stale": now.Add(-2 * Interval),
63+
"recent": now.Add(-Interval / 2),
64+
}
65+
66+
pruneSeenRequestIDs(seenRequestIDs, now.Add(-Interval))
67+
68+
require.NotContains(t, seenRequestIDs, "stale")
69+
require.Contains(t, seenRequestIDs, "recent")
70+
}

0 commit comments

Comments
 (0)