Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/common/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewStreamingConnector(
Logger: logger,
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertTimeout: conf.InsertTimeout,
InsertQuerySettings: conf.InsertQuerySettings,
MeterQuerySettings: conf.MeterQuerySettings,
EnablePrewhere: conf.EnablePrewhere,
Expand Down
10 changes: 10 additions & 0 deletions app/config/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ type AggregationConfiguration struct {
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool

// InsertTimeout bounds synchronous batch inserts when the caller's context has no
// deadline. The native batch path arms connection read deadlines only from the
// context, so without it a half-open connection could hang an insert indefinitely.
InsertTimeout time.Duration

// See https://clickhouse.com/docs/en/operations/settings/settings
// For example, you can set the `max_insert_threads` setting to control the number of threads
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
Expand Down Expand Up @@ -57,6 +62,10 @@ func (c AggregationConfiguration) Validate() error {
return errors.New("async insert wait is set but async insert is not")
}

if c.InsertTimeout <= 0 {
return errors.New("insert timeout must be greater than 0")
}

return nil
}

Expand Down Expand Up @@ -199,6 +208,7 @@ func ConfigureAggregation(v *viper.Viper) {
v.SetDefault("aggregation.eventsTableName", "om_events")
v.SetDefault("aggregation.asyncInsert", false)
v.SetDefault("aggregation.asyncInsertWait", false)
v.SetDefault("aggregation.insertTimeout", "1m")

v.SetDefault("aggregation.clickhouse.address", "127.0.0.1:9000")
v.SetDefault("aggregation.clickhouse.tls", false)
Expand Down
5 changes: 1 addition & 4 deletions app/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func TestComplete(t *testing.T) {
EventsTableName: "om_events",
AsyncInsert: false,
AsyncInsertWait: false,
InsertTimeout: time.Minute,
},
Entitlements: EntitlementsConfiguration{
GracePeriod: datetime.ISODurationString("P1D"),
Expand Down Expand Up @@ -259,10 +260,6 @@ func TestComplete(t *testing.T) {
},
},
},
Storage: StorageConfiguration{
AsyncInsert: false,
AsyncInsertWait: false,
},
},
Dedupe: DedupeConfiguration{
Enabled: true,
Expand Down
32 changes: 0 additions & 32 deletions app/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ type SinkConfiguration struct {
IngestNotifications IngestNotificationsConfiguration
// Kafka client/Consumer configuration
Kafka KafkaConfig
// TODO: remove, config moved to aggregation config
// Storage configuration
Storage StorageConfiguration

// NamespaceRefetchTimeout is the timeout for updating namespaces and consumer subscription.
// It must be less than NamespaceRefetch interval.
Expand Down Expand Up @@ -108,30 +105,6 @@ func (c IngestNotificationsConfiguration) Validate() error {
return errors.Join(errs...)
}

type StorageConfiguration struct {
// Set true for ClickHouse first store the incoming inserts into an in-memory buffer
// before flushing them regularly to disk.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
AsyncInsert bool
// Set true if you want an insert statement to return with an acknowledgment immediately
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool

// See https://clickhouse.com/docs/en/operations/settings/settings
// For example, you can set the `max_insert_threads` setting to control the number of threads
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
QuerySettings map[string]string
}

func (c StorageConfiguration) Validate() error {
if c.AsyncInsertWait && !c.AsyncInsert {
return errors.New("AsyncInsertWait is set but AsyncInsert is not")
}

return nil
}

// ConfigureSink setup Sink specific configuration defaults for provided *viper.Viper instance.
func ConfigureSink(v *viper.Viper) {
// Sink Dedupe
Expand Down Expand Up @@ -167,11 +140,6 @@ func ConfigureSink(v *viper.Viper) {
v.SetDefault("sink.meterRefetchInterval", "15s")
v.SetDefault("sink.logDroppedEvents", false)

// TODO: remove, config moved to aggregation config
// Sink Storage
v.SetDefault("sink.storage.asyncInsert", false)
v.SetDefault("sink.storage.asyncInsertWait", false)

// Sink Kafka configuration
ConfigureKafkaConfiguration(v, "sink")

Expand Down
14 changes: 14 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ server:
# # Setting it to 0 disables cache entry expiration.
# cacheTTL: 5m

# aggregation:
# # With async insert ClickHouse buffers and batches inserts server-side.
# # See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
# asyncInsert: false
# asyncInsertWait: false
# # insertTimeout bounds synchronous batch inserts when the caller provides no deadline.
# # Protects against half-open ClickHouse connections hanging the insert indefinitely.
# insertTimeout: 1m
# clickhouse:
# address: 127.0.0.1:9000
# username: default
# password: default
# database: openmeter

# dedupe:
# enabled: true
# driver: redis
Expand Down
80 changes: 66 additions & 14 deletions openmeter/streaming/clickhouse/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Config struct {
EventsTableName string
AsyncInsert bool
AsyncInsertWait bool
InsertTimeout time.Duration
InsertQuerySettings map[string]string
MeterQuerySettings map[string]string
EnablePrewhere bool
Expand Down Expand Up @@ -258,30 +259,81 @@ func (c *Connector) CountEvents(ctx context.Context, namespace string, params st
}

func (c *Connector) BatchInsert(ctx context.Context, rawEvents []streaming.RawEvent) error {
var err error

// Insert raw events
query := InsertEventsQuery{
Database: c.config.Database,
EventsTableName: c.config.EventsTableName,
Events: rawEvents,
QuerySettings: c.config.InsertQuerySettings,
if len(rawEvents) == 0 {
return nil
}
sql, args := query.ToSQL()

// By default, ClickHouse is writing data synchronously.
// Async insert requires inline data in the INSERT statement, so it uses a textual query.
// See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
if c.config.AsyncInsert {
query := InsertEventsQuery{
Database: c.config.Database,
EventsTableName: c.config.EventsTableName,
Events: rawEvents,
QuerySettings: c.config.InsertQuerySettings,
}
sql, args := query.ToSQL()

// With the `wait_for_async_insert` setting, you can configure
// if you want an insert statement to return with an acknowledgment
// either immediately after the data got inserted into the buffer.
err = c.config.ClickHouse.Exec(clickhouse.Context(ctx, clickhouse.WithAsync(c.config.AsyncInsertWait)), sql, args...)
} else {
err = c.config.ClickHouse.Exec(ctx, sql, args...)
if err := c.config.ClickHouse.Exec(clickhouse.Context(ctx, clickhouse.WithAsync(c.config.AsyncInsertWait)), sql, args...); err != nil {
return fmt.Errorf("failed to batch insert raw events: %w", err)
}

return nil
}

// Synchronous inserts stream native columnar blocks, avoiding client-side
// parameter interpolation and server-side VALUES parsing on large batches.
if len(c.config.InsertQuerySettings) > 0 {
settings := clickhouse.Settings{}
for key, value := range c.config.InsertQuerySettings {
settings[key] = value
}

ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.

// Unlike Exec, the batch path only arms connection read deadlines when the context
// carries one, so an unbounded context would hang forever on a half-open connection.
// Respect a caller-provided deadline; otherwise bound the insert by InsertTimeout.
if _, ok := ctx.Deadline(); !ok && c.config.InsertTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.config.InsertTimeout)
defer cancel()
}

tableName := getTableName(c.config.Database, c.config.EventsTableName)

batch, err := c.config.ClickHouse.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s (%s)", tableName, strings.Join(rawEventColumns, ", ")))
Comment thread
tothandras marked this conversation as resolved.
if err != nil {
return fmt.Errorf("failed to batch insert raw events: %w", err)
return fmt.Errorf("failed to prepare batch insert: %w", err)
}
defer func() {
_ = batch.Close()
}()

for _, event := range rawEvents {
err := batch.Append(
event.Namespace,
event.ID,
event.Type,
event.Source,
event.Subject,
event.Time,
event.Data,
event.IngestedAt,
event.StoredAt,
event.StoreRowID,
)
if err != nil {
return fmt.Errorf("failed to append raw event to batch (namespace: %s, id: %s): %w", event.Namespace, event.ID, err)
}
}

if err := batch.Send(); err != nil {
return fmt.Errorf("failed to send raw events batch: %w", err)
}

return nil
Expand Down
135 changes: 126 additions & 9 deletions openmeter/streaming/clickhouse/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ func TestConnector_QueryMeter(t *testing.T) {
}

func TestBatchInsert(t *testing.T) {
connector, mockCH := GetMockConnector(t)

ctx := context.Background()
now := time.Now().UTC()

Expand All @@ -190,13 +188,132 @@ func TestBatchInsert(t *testing.T) {
},
}

// Mock the batch insert
mockCH.On("Exec", mock.Anything, mock.AnythingOfType("string"), mock.Anything).Return(nil).Once()
expectedInsertStatement := "INSERT INTO testdb.events (namespace, id, type, source, subject, time, data, ingested_at, stored_at, store_row_id)"
expectedAppendArgs := []any{
events[0].Namespace,
events[0].ID,
events[0].Type,
events[0].Source,
events[0].Subject,
events[0].Time,
events[0].Data,
events[0].IngestedAt,
events[0].StoredAt,
events[0].StoreRowID,
}

// Execute the method
err := connector.BatchInsert(ctx, events)
require.NoError(t, err)
t.Run("SyncInsert", func(t *testing.T) {
connector, mockCH := GetMockConnector(t)

// Verify mocks were called
mockCH.AssertExpectations(t)
mockBatch := NewMockBatch()
mockCH.On("PrepareBatch", mock.Anything, expectedInsertStatement, mock.Anything).Return(mockBatch, nil).Once()
mockBatch.On("Append", expectedAppendArgs).Return(nil).Once()
mockBatch.On("Send").Return(nil).Once()
mockBatch.On("Close").Return(nil).Once()

err := connector.BatchInsert(ctx, events)
require.NoError(t, err)

mockCH.AssertExpectations(t)
mockBatch.AssertExpectations(t)
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.

t.Run("SyncInsertMultipleEvents", func(t *testing.T) {
connector, mockCH := GetMockConnector(t)

multiEvents := []streaming.RawEvent{events[0], {Namespace: "test-namespace-2", ID: "2", Time: now}}

mockBatch := NewMockBatch()
mockCH.On("PrepareBatch", mock.Anything, expectedInsertStatement, mock.Anything).Return(mockBatch, nil).Once()
mockBatch.On("Append", mock.Anything).Return(nil).Times(len(multiEvents))
mockBatch.On("Send").Return(nil).Once()
mockBatch.On("Close").Return(nil).Once()

err := connector.BatchInsert(ctx, multiEvents)
require.NoError(t, err)

mockCH.AssertExpectations(t)
mockBatch.AssertExpectations(t)
})

t.Run("SyncInsertEmptyBatch", func(t *testing.T) {
connector, mockCH := GetMockConnector(t)

err := connector.BatchInsert(ctx, nil)
require.NoError(t, err)

// No PrepareBatch or Exec calls expected
mockCH.AssertExpectations(t)
})

t.Run("SyncInsertPrepareBatchError", func(t *testing.T) {
connector, mockCH := GetMockConnector(t)

mockBatch := NewMockBatch()
mockCH.On("PrepareBatch", mock.Anything, expectedInsertStatement, mock.Anything).Return(mockBatch, errors.New("prepare failed")).Once()

err := connector.BatchInsert(ctx, events)
require.ErrorContains(t, err, "failed to prepare batch insert")

mockCH.AssertExpectations(t)
})

t.Run("SyncInsertAppendError", func(t *testing.T) {
connector, mockCH := GetMockConnector(t)

mockBatch := NewMockBatch()
mockCH.On("PrepareBatch", mock.Anything, expectedInsertStatement, mock.Anything).Return(mockBatch, nil).Once()
mockBatch.On("Append", expectedAppendArgs).Return(errors.New("append failed")).Once()
mockBatch.On("Close").Return(nil).Once()

err := connector.BatchInsert(ctx, events)
require.ErrorContains(t, err, "failed to append raw event to batch")

mockCH.AssertExpectations(t)
mockBatch.AssertExpectations(t)
})

t.Run("SyncInsertSendError", func(t *testing.T) {
connector, mockCH := GetMockConnector(t)

mockBatch := NewMockBatch()
mockCH.On("PrepareBatch", mock.Anything, expectedInsertStatement, mock.Anything).Return(mockBatch, nil).Once()
mockBatch.On("Append", expectedAppendArgs).Return(nil).Once()
mockBatch.On("Send").Return(errors.New("send failed")).Once()
mockBatch.On("Close").Return(nil).Once()

err := connector.BatchInsert(ctx, events)
require.ErrorContains(t, err, "failed to send raw events batch")

mockCH.AssertExpectations(t)
mockBatch.AssertExpectations(t)
})

t.Run("AsyncInsert", func(t *testing.T) {
connector, mockCH := GetMockConnector(t, func(config Config) Config {
config.AsyncInsert = true
return config
})

mockCH.On("Exec", mock.Anything, mock.AnythingOfType("string"), mock.Anything).Return(nil).Once()

err := connector.BatchInsert(ctx, events)
require.NoError(t, err)

mockCH.AssertExpectations(t)
})

t.Run("AsyncInsertExecError", func(t *testing.T) {
connector, mockCH := GetMockConnector(t, func(config Config) Config {
config.AsyncInsert = true
return config
})

mockCH.On("Exec", mock.Anything, mock.AnythingOfType("string"), mock.Anything).Return(errors.New("exec failed")).Once()

err := connector.BatchInsert(ctx, events)
require.ErrorContains(t, err, "failed to batch insert raw events")

mockCH.AssertExpectations(t)
})
}
Loading
Loading