From b7b146393c809c42f64326ccae4b55330aaa6d98 Mon Sep 17 00:00:00 2001 From: Andras Toth <4157749+tothandras@users.noreply.github.com> Date: Fri, 12 Jun 2026 15:26:19 +0200 Subject: [PATCH 1/2] refactor(streaming): optimize sync batch inserts --- openmeter/streaming/clickhouse/connector.go | 68 +++++++++++++++---- .../streaming/clickhouse/connector_test.go | 36 +++++++--- openmeter/streaming/clickhouse/mock.go | 62 +++++++++++++++++ 3 files changed, 142 insertions(+), 24 deletions(-) diff --git a/openmeter/streaming/clickhouse/connector.go b/openmeter/streaming/clickhouse/connector.go index 837ba886f2..ffc935a1fb 100644 --- a/openmeter/streaming/clickhouse/connector.go +++ b/openmeter/streaming/clickhouse/connector.go @@ -258,29 +258,67 @@ func (c *Connector) CountEvents(ctx context.Context, namespace string, params st } func (c *Connector) BatchInsert(ctx context.Context, rawEvents []streaming.RawEvent) error { - var err error - - // Insert raw events - query := InsertEventsQuery{ - Database: c.config.Database, - EventsTableName: c.config.EventsTableName, - Events: rawEvents, - QuerySettings: c.config.InsertQuerySettings, - } - sql, args := query.ToSQL() - - // By default, ClickHouse is writing data synchronously. + // Async insert requires inline data in the INSERT statement, so it uses a textual query. // See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts if c.config.AsyncInsert { + query := InsertEventsQuery{ + Database: c.config.Database, + EventsTableName: c.config.EventsTableName, + Events: rawEvents, + QuerySettings: c.config.InsertQuerySettings, + } + sql, args := query.ToSQL() + // With the `wait_for_async_insert` setting, you can configure // if you want an insert statement to return with an acknowledgment // either immediately after the data got inserted into the buffer. - err = c.config.ClickHouse.Exec(clickhouse.Context(ctx, clickhouse.WithAsync(c.config.AsyncInsertWait)), sql, args...) - } else { - err = c.config.ClickHouse.Exec(ctx, sql, args...) + if err := c.config.ClickHouse.Exec(clickhouse.Context(ctx, clickhouse.WithAsync(c.config.AsyncInsertWait)), sql, args...); err != nil { + return fmt.Errorf("failed to batch insert raw events: %w", err) + } + + return nil + } + + // Synchronous inserts stream native columnar blocks, avoiding client-side + // parameter interpolation and server-side VALUES parsing on large batches. + if len(c.config.InsertQuerySettings) > 0 { + settings := clickhouse.Settings{} + for key, value := range c.config.InsertQuerySettings { + settings[key] = value + } + + ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings)) } + tableName := getTableName(c.config.Database, c.config.EventsTableName) + + batch, err := c.config.ClickHouse.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s (namespace, id, type, source, subject, time, data, ingested_at, stored_at, store_row_id)", tableName)) if err != nil { + return fmt.Errorf("failed to prepare batch insert: %w", err) + } + defer func() { + _ = batch.Close() + }() + + for _, event := range rawEvents { + err := batch.Append( + event.Namespace, + event.ID, + event.Type, + event.Source, + event.Subject, + event.Time, + event.Data, + event.IngestedAt, + event.StoredAt, + event.StoreRowID, + ) + if err != nil { + return fmt.Errorf("failed to append raw event to batch: %w", err) + } + } + + if err := batch.Send(); err != nil { return fmt.Errorf("failed to batch insert raw events: %w", err) } diff --git a/openmeter/streaming/clickhouse/connector_test.go b/openmeter/streaming/clickhouse/connector_test.go index f5da1ae57f..718c0828cc 100644 --- a/openmeter/streaming/clickhouse/connector_test.go +++ b/openmeter/streaming/clickhouse/connector_test.go @@ -176,8 +176,6 @@ func TestConnector_QueryMeter(t *testing.T) { } func TestBatchInsert(t *testing.T) { - connector, mockCH := GetMockConnector(t) - ctx := context.Background() now := time.Now().UTC() @@ -190,13 +188,33 @@ func TestBatchInsert(t *testing.T) { }, } - // Mock the batch insert - mockCH.On("Exec", mock.Anything, mock.AnythingOfType("string"), mock.Anything).Return(nil).Once() + t.Run("SyncInsert", func(t *testing.T) { + connector, mockCH := GetMockConnector(t) - // Execute the method - err := connector.BatchInsert(ctx, events) - require.NoError(t, err) + mockBatch := NewMockBatch() + mockCH.On("PrepareBatch", mock.Anything, mock.AnythingOfType("string"), mock.Anything).Return(mockBatch, nil).Once() + mockBatch.On("Append", mock.Anything).Return(nil).Once() + mockBatch.On("Send").Return(nil).Once() + mockBatch.On("Close").Return(nil).Once() - // Verify mocks were called - mockCH.AssertExpectations(t) + err := connector.BatchInsert(ctx, events) + require.NoError(t, err) + + mockCH.AssertExpectations(t) + mockBatch.AssertExpectations(t) + }) + + t.Run("AsyncInsert", func(t *testing.T) { + connector, mockCH := GetMockConnector(t, func(config Config) Config { + config.AsyncInsert = true + return config + }) + + mockCH.On("Exec", mock.Anything, mock.AnythingOfType("string"), mock.Anything).Return(nil).Once() + + err := connector.BatchInsert(ctx, events) + require.NoError(t, err) + + mockCH.AssertExpectations(t) + }) } diff --git a/openmeter/streaming/clickhouse/mock.go b/openmeter/streaming/clickhouse/mock.go index 2056599a02..1fce992abe 100644 --- a/openmeter/streaming/clickhouse/mock.go +++ b/openmeter/streaming/clickhouse/mock.go @@ -4,6 +4,7 @@ import ( "context" "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/column" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/stretchr/testify/mock" ) @@ -74,6 +75,67 @@ func (m *MockClickHouse) Close() error { return callArgs.Error(0) } +var _ driver.Batch = &MockBatch{} + +func NewMockBatch() *MockBatch { + return &MockBatch{} +} + +// MockBatch is a mock for the Batch interface +type MockBatch struct { + mock.Mock +} + +func (m *MockBatch) Abort() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockBatch) Append(v ...any) error { + args := m.Called(v) + return args.Error(0) +} + +func (m *MockBatch) AppendStruct(v any) error { + args := m.Called(v) + return args.Error(0) +} + +func (m *MockBatch) Column(idx int) driver.BatchColumn { + args := m.Called(idx) + return args.Get(0).(driver.BatchColumn) +} + +func (m *MockBatch) Flush() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockBatch) Send() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockBatch) IsSent() bool { + args := m.Called() + return args.Bool(0) +} + +func (m *MockBatch) Rows() int { + args := m.Called() + return args.Int(0) +} + +func (m *MockBatch) Columns() []column.Interface { + args := m.Called() + return args.Get(0).([]column.Interface) +} + +func (m *MockBatch) Close() error { + args := m.Called() + return args.Error(0) +} + var _ driver.Rows = &MockRows{} func NewMockRows() *MockRows { From 292853319cd721f1c2c583f8fb37767492106cfc Mon Sep 17 00:00:00 2001 From: Andras Toth <4157749+tothandras@users.noreply.github.com> Date: Fri, 12 Jun 2026 16:14:08 +0200 Subject: [PATCH 2/2] fix: review comments --- app/common/streaming.go | 1 + app/config/aggregation.go | 10 ++ app/config/config_test.go | 5 +- app/config/sink.go | 32 ------ config.example.yaml | 14 +++ openmeter/streaming/clickhouse/connector.go | 20 +++- .../streaming/clickhouse/connector_test.go | 103 +++++++++++++++++- openmeter/streaming/clickhouse/event_query.go | 6 +- pkg/framework/clickhouseotel/otel.go | 73 +++++++++++++ 9 files changed, 222 insertions(+), 42 deletions(-) diff --git a/app/common/streaming.go b/app/common/streaming.go index 19480defd1..4b1c7658e0 100644 --- a/app/common/streaming.go +++ b/app/common/streaming.go @@ -38,6 +38,7 @@ func NewStreamingConnector( Logger: logger, AsyncInsert: conf.AsyncInsert, AsyncInsertWait: conf.AsyncInsertWait, + InsertTimeout: conf.InsertTimeout, InsertQuerySettings: conf.InsertQuerySettings, MeterQuerySettings: conf.MeterQuerySettings, EnablePrewhere: conf.EnablePrewhere, diff --git a/app/config/aggregation.go b/app/config/aggregation.go index da77ccbb73..ff2eceaf48 100644 --- a/app/config/aggregation.go +++ b/app/config/aggregation.go @@ -24,6 +24,11 @@ type AggregationConfiguration struct { // Setting true can cause silent errors that you need to monitor separately. AsyncInsertWait bool + // InsertTimeout bounds synchronous batch inserts when the caller's context has no + // deadline. The native batch path arms connection read deadlines only from the + // context, so without it a half-open connection could hang an insert indefinitely. + InsertTimeout time.Duration + // See https://clickhouse.com/docs/en/operations/settings/settings // For example, you can set the `max_insert_threads` setting to control the number of threads // or the `parallel_view_processing` setting to enable pushing to attached views concurrently. @@ -57,6 +62,10 @@ func (c AggregationConfiguration) Validate() error { return errors.New("async insert wait is set but async insert is not") } + if c.InsertTimeout <= 0 { + return errors.New("insert timeout must be greater than 0") + } + return nil } @@ -199,6 +208,7 @@ func ConfigureAggregation(v *viper.Viper) { v.SetDefault("aggregation.eventsTableName", "om_events") v.SetDefault("aggregation.asyncInsert", false) v.SetDefault("aggregation.asyncInsertWait", false) + v.SetDefault("aggregation.insertTimeout", "1m") v.SetDefault("aggregation.clickhouse.address", "127.0.0.1:9000") v.SetDefault("aggregation.clickhouse.tls", false) diff --git a/app/config/config_test.go b/app/config/config_test.go index 098c8298bf..1751d0d53d 100644 --- a/app/config/config_test.go +++ b/app/config/config_test.go @@ -160,6 +160,7 @@ func TestComplete(t *testing.T) { EventsTableName: "om_events", AsyncInsert: false, AsyncInsertWait: false, + InsertTimeout: time.Minute, }, Entitlements: EntitlementsConfiguration{ GracePeriod: datetime.ISODurationString("P1D"), @@ -259,10 +260,6 @@ func TestComplete(t *testing.T) { }, }, }, - Storage: StorageConfiguration{ - AsyncInsert: false, - AsyncInsertWait: false, - }, }, Dedupe: DedupeConfiguration{ Enabled: true, diff --git a/app/config/sink.go b/app/config/sink.go index e14f74484f..8464c9da7a 100644 --- a/app/config/sink.go +++ b/app/config/sink.go @@ -22,9 +22,6 @@ type SinkConfiguration struct { IngestNotifications IngestNotificationsConfiguration // Kafka client/Consumer configuration Kafka KafkaConfig - // TODO: remove, config moved to aggregation config - // Storage configuration - Storage StorageConfiguration // NamespaceRefetchTimeout is the timeout for updating namespaces and consumer subscription. // It must be less than NamespaceRefetch interval. @@ -108,30 +105,6 @@ func (c IngestNotificationsConfiguration) Validate() error { return errors.Join(errs...) } -type StorageConfiguration struct { - // Set true for ClickHouse first store the incoming inserts into an in-memory buffer - // before flushing them regularly to disk. - // See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts - AsyncInsert bool - // Set true if you want an insert statement to return with an acknowledgment immediately - // without waiting for the data got inserted into the buffer. - // Setting true can cause silent errors that you need to monitor separately. - AsyncInsertWait bool - - // See https://clickhouse.com/docs/en/operations/settings/settings - // For example, you can set the `max_insert_threads` setting to control the number of threads - // or the `parallel_view_processing` setting to enable pushing to attached views concurrently. - QuerySettings map[string]string -} - -func (c StorageConfiguration) Validate() error { - if c.AsyncInsertWait && !c.AsyncInsert { - return errors.New("AsyncInsertWait is set but AsyncInsert is not") - } - - return nil -} - // ConfigureSink setup Sink specific configuration defaults for provided *viper.Viper instance. func ConfigureSink(v *viper.Viper) { // Sink Dedupe @@ -167,11 +140,6 @@ func ConfigureSink(v *viper.Viper) { v.SetDefault("sink.meterRefetchInterval", "15s") v.SetDefault("sink.logDroppedEvents", false) - // TODO: remove, config moved to aggregation config - // Sink Storage - v.SetDefault("sink.storage.asyncInsert", false) - v.SetDefault("sink.storage.asyncInsertWait", false) - // Sink Kafka configuration ConfigureKafkaConfiguration(v, "sink") diff --git a/config.example.yaml b/config.example.yaml index b293dd37c6..34dd977b67 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -67,6 +67,20 @@ server: # # Setting it to 0 disables cache entry expiration. # cacheTTL: 5m +# aggregation: +# # With async insert ClickHouse buffers and batches inserts server-side. +# # See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts +# asyncInsert: false +# asyncInsertWait: false +# # insertTimeout bounds synchronous batch inserts when the caller provides no deadline. +# # Protects against half-open ClickHouse connections hanging the insert indefinitely. +# insertTimeout: 1m +# clickhouse: +# address: 127.0.0.1:9000 +# username: default +# password: default +# database: openmeter + # dedupe: # enabled: true # driver: redis diff --git a/openmeter/streaming/clickhouse/connector.go b/openmeter/streaming/clickhouse/connector.go index ffc935a1fb..f2b149b4f3 100644 --- a/openmeter/streaming/clickhouse/connector.go +++ b/openmeter/streaming/clickhouse/connector.go @@ -32,6 +32,7 @@ type Config struct { EventsTableName string AsyncInsert bool AsyncInsertWait bool + InsertTimeout time.Duration InsertQuerySettings map[string]string MeterQuerySettings map[string]string EnablePrewhere bool @@ -258,6 +259,10 @@ func (c *Connector) CountEvents(ctx context.Context, namespace string, params st } func (c *Connector) BatchInsert(ctx context.Context, rawEvents []streaming.RawEvent) error { + if len(rawEvents) == 0 { + return nil + } + // Async insert requires inline data in the INSERT statement, so it uses a textual query. // See https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts if c.config.AsyncInsert { @@ -290,9 +295,18 @@ func (c *Connector) BatchInsert(ctx context.Context, rawEvents []streaming.RawEv ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings)) } + // Unlike Exec, the batch path only arms connection read deadlines when the context + // carries one, so an unbounded context would hang forever on a half-open connection. + // Respect a caller-provided deadline; otherwise bound the insert by InsertTimeout. + if _, ok := ctx.Deadline(); !ok && c.config.InsertTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, c.config.InsertTimeout) + defer cancel() + } + tableName := getTableName(c.config.Database, c.config.EventsTableName) - batch, err := c.config.ClickHouse.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s (namespace, id, type, source, subject, time, data, ingested_at, stored_at, store_row_id)", tableName)) + batch, err := c.config.ClickHouse.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s (%s)", tableName, strings.Join(rawEventColumns, ", "))) if err != nil { return fmt.Errorf("failed to prepare batch insert: %w", err) } @@ -314,12 +328,12 @@ func (c *Connector) BatchInsert(ctx context.Context, rawEvents []streaming.RawEv event.StoreRowID, ) if err != nil { - return fmt.Errorf("failed to append raw event to batch: %w", err) + return fmt.Errorf("failed to append raw event to batch (namespace: %s, id: %s): %w", event.Namespace, event.ID, err) } } if err := batch.Send(); err != nil { - return fmt.Errorf("failed to batch insert raw events: %w", err) + return fmt.Errorf("failed to send raw events batch: %w", err) } return nil diff --git a/openmeter/streaming/clickhouse/connector_test.go b/openmeter/streaming/clickhouse/connector_test.go index 718c0828cc..c9b0e2c546 100644 --- a/openmeter/streaming/clickhouse/connector_test.go +++ b/openmeter/streaming/clickhouse/connector_test.go @@ -188,12 +188,26 @@ func TestBatchInsert(t *testing.T) { }, } + expectedInsertStatement := "INSERT INTO testdb.events (namespace, id, type, source, subject, time, data, ingested_at, stored_at, store_row_id)" + expectedAppendArgs := []any{ + events[0].Namespace, + events[0].ID, + events[0].Type, + events[0].Source, + events[0].Subject, + events[0].Time, + events[0].Data, + events[0].IngestedAt, + events[0].StoredAt, + events[0].StoreRowID, + } + t.Run("SyncInsert", func(t *testing.T) { connector, mockCH := GetMockConnector(t) mockBatch := NewMockBatch() - mockCH.On("PrepareBatch", mock.Anything, mock.AnythingOfType("string"), mock.Anything).Return(mockBatch, nil).Once() - mockBatch.On("Append", mock.Anything).Return(nil).Once() + mockCH.On("PrepareBatch", mock.Anything, expectedInsertStatement, mock.Anything).Return(mockBatch, nil).Once() + mockBatch.On("Append", expectedAppendArgs).Return(nil).Once() mockBatch.On("Send").Return(nil).Once() mockBatch.On("Close").Return(nil).Once() @@ -204,6 +218,77 @@ func TestBatchInsert(t *testing.T) { mockBatch.AssertExpectations(t) }) + t.Run("SyncInsertMultipleEvents", func(t *testing.T) { + connector, mockCH := GetMockConnector(t) + + multiEvents := []streaming.RawEvent{events[0], {Namespace: "test-namespace-2", ID: "2", Time: now}} + + mockBatch := NewMockBatch() + mockCH.On("PrepareBatch", mock.Anything, expectedInsertStatement, mock.Anything).Return(mockBatch, nil).Once() + mockBatch.On("Append", mock.Anything).Return(nil).Times(len(multiEvents)) + mockBatch.On("Send").Return(nil).Once() + mockBatch.On("Close").Return(nil).Once() + + err := connector.BatchInsert(ctx, multiEvents) + require.NoError(t, err) + + mockCH.AssertExpectations(t) + mockBatch.AssertExpectations(t) + }) + + t.Run("SyncInsertEmptyBatch", func(t *testing.T) { + connector, mockCH := GetMockConnector(t) + + err := connector.BatchInsert(ctx, nil) + require.NoError(t, err) + + // No PrepareBatch or Exec calls expected + mockCH.AssertExpectations(t) + }) + + t.Run("SyncInsertPrepareBatchError", func(t *testing.T) { + connector, mockCH := GetMockConnector(t) + + mockBatch := NewMockBatch() + mockCH.On("PrepareBatch", mock.Anything, expectedInsertStatement, mock.Anything).Return(mockBatch, errors.New("prepare failed")).Once() + + err := connector.BatchInsert(ctx, events) + require.ErrorContains(t, err, "failed to prepare batch insert") + + mockCH.AssertExpectations(t) + }) + + t.Run("SyncInsertAppendError", func(t *testing.T) { + connector, mockCH := GetMockConnector(t) + + mockBatch := NewMockBatch() + mockCH.On("PrepareBatch", mock.Anything, expectedInsertStatement, mock.Anything).Return(mockBatch, nil).Once() + mockBatch.On("Append", expectedAppendArgs).Return(errors.New("append failed")).Once() + mockBatch.On("Close").Return(nil).Once() + + err := connector.BatchInsert(ctx, events) + require.ErrorContains(t, err, "failed to append raw event to batch") + + mockCH.AssertExpectations(t) + mockBatch.AssertExpectations(t) + }) + + t.Run("SyncInsertSendError", func(t *testing.T) { + connector, mockCH := GetMockConnector(t) + + mockBatch := NewMockBatch() + mockCH.On("PrepareBatch", mock.Anything, expectedInsertStatement, mock.Anything).Return(mockBatch, nil).Once() + mockBatch.On("Append", expectedAppendArgs).Return(nil).Once() + mockBatch.On("Send").Return(errors.New("send failed")).Once() + mockBatch.On("Close").Return(nil).Once() + + err := connector.BatchInsert(ctx, events) + require.ErrorContains(t, err, "failed to send raw events batch") + + mockCH.AssertExpectations(t) + mockBatch.AssertExpectations(t) + }) + t.Run("AsyncInsert", func(t *testing.T) { connector, mockCH := GetMockConnector(t, func(config Config) Config { config.AsyncInsert = true @@ -217,4 +302,18 @@ func TestBatchInsert(t *testing.T) { mockCH.AssertExpectations(t) }) + + t.Run("AsyncInsertExecError", func(t *testing.T) { + connector, mockCH := GetMockConnector(t, func(config Config) Config { + config.AsyncInsert = true + return config + }) + + mockCH.On("Exec", mock.Anything, mock.AnythingOfType("string"), mock.Anything).Return(errors.New("exec failed")).Once() + + err := connector.BatchInsert(ctx, events) + require.ErrorContains(t, err, "failed to batch insert raw events") + + mockCH.AssertExpectations(t) + }) } diff --git a/openmeter/streaming/clickhouse/event_query.go b/openmeter/streaming/clickhouse/event_query.go index b592a14be8..25256a960e 100644 --- a/openmeter/streaming/clickhouse/event_query.go +++ b/openmeter/streaming/clickhouse/event_query.go @@ -191,6 +191,10 @@ func (d queryCountEvents) toSQL() (string, []interface{}) { return sql, args } +// rawEventColumns is the ordered list of event table columns populated by inserts. +// The order must match the value order in InsertEventsQuery.ToSQL and Connector.BatchInsert. +var rawEventColumns = []string{"namespace", "id", "type", "source", "subject", "time", "data", "ingested_at", "stored_at", "store_row_id"} + // Insert Events Query type InsertEventsQuery struct { Database string @@ -204,7 +208,7 @@ func (q InsertEventsQuery) ToSQL() (string, []interface{}) { query := sqlbuilder.ClickHouse.NewInsertBuilder() query.InsertInto(tableName) - query.Cols("namespace", "id", "type", "source", "subject", "time", "data", "ingested_at", "stored_at", "store_row_id") + query.Cols(rawEventColumns...) // Add settings var settings []string diff --git a/pkg/framework/clickhouseotel/otel.go b/pkg/framework/clickhouseotel/otel.go index 1992acf8fe..0bec99ed21 100644 --- a/pkg/framework/clickhouseotel/otel.go +++ b/pkg/framework/clickhouseotel/otel.go @@ -108,6 +108,79 @@ func (c *ClickHouseTracer) Exec(ctx context.Context, query string, args ...any) return nil } +func (c *ClickHouseTracer) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) { + ctx, span := c.Tracer.Start(ctx, "clickhouse.PrepareBatch", trace.WithAttributes( + attribute.String("query", query), + )) + + batch, err := c.Conn.PrepareBatch(ctx, query, opts...) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + span.End() + + return batch, err + } + + return &tracedBatch{Batch: batch, span: span}, nil +} + +// tracedBatch records Append/Send errors on the PrepareBatch span and ends the span +// when the batch is finalized. Ending an already-ended span is a no-op, so the +// Send/Abort/Close finalizers do not need to coordinate. +type tracedBatch struct { + driver.Batch + + span trace.Span +} + +func (b *tracedBatch) Append(v ...any) error { + err := b.Batch.Append(v...) + if err != nil { + b.span.RecordError(err) + b.span.SetStatus(codes.Error, err.Error()) + } + + return err +} + +func (b *tracedBatch) AppendStruct(v any) error { + err := b.Batch.AppendStruct(v) + if err != nil { + b.span.RecordError(err) + b.span.SetStatus(codes.Error, err.Error()) + } + + return err +} + +func (b *tracedBatch) Send() error { + b.span.SetAttributes(attribute.Int("rows", b.Batch.Rows())) + + err := b.Batch.Send() + if err != nil { + b.span.RecordError(err) + b.span.SetStatus(codes.Error, err.Error()) + } + b.span.End() + + return err +} + +func (b *tracedBatch) Abort() error { + err := b.Batch.Abort() + b.span.End() + + return err +} + +func (b *tracedBatch) Close() error { + err := b.Batch.Close() + b.span.End() + + return err +} + func (c *ClickHouseTracer) AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error { ctx, span := c.Tracer.Start(ctx, "clickhouse.AsyncInsert", trace.WithAttributes( attribute.String("query", query),