Skip to content

refactor(streaming): optimize sync batch inserts#4523

Open
tothandras wants to merge 2 commits into
mainfrom
refactor/streaming
Open

refactor(streaming): optimize sync batch inserts#4523
tothandras wants to merge 2 commits into
mainfrom
refactor/streaming

Conversation

@tothandras

@tothandras tothandras commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Summary by CodeRabbit

  • New Features

    • Added configurable insert timeout for synchronous batch inserts.
    • Added tracing for batch prepare/append/send operations.
  • Refactor

    • Split and streamlined synchronous vs asynchronous insert paths for more predictable behavior.
  • Configuration

    • New aggregation insert timeout setting and example config entries; removed legacy sink storage settings.
  • Tests

    • Expanded mocks and test cases to cover both sync and async insert flows.

@tothandras tothandras requested a review from a team as a code owner June 12, 2026 13:27
@tothandras tothandras added the release-note/misc Miscellaneous changes label Jun 12, 2026
@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 2e5d59d7-7906-45cc-84a1-05e5e663aeba

📥 Commits

Reviewing files that changed from the base of the PR and between b7b1463 and 2928533.

📒 Files selected for processing (9)
  • app/common/streaming.go
  • app/config/aggregation.go
  • app/config/config_test.go
  • app/config/sink.go
  • config.example.yaml
  • openmeter/streaming/clickhouse/connector.go
  • openmeter/streaming/clickhouse/connector_test.go
  • openmeter/streaming/clickhouse/event_query.go
  • pkg/framework/clickhouseotel/otel.go
💤 Files with no reviewable changes (1)
  • app/config/sink.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • openmeter/streaming/clickhouse/connector.go
  • openmeter/streaming/clickhouse/connector_test.go

📝 Walkthrough

Walkthrough

BatchInsert now has separate async and sync execution paths (async uses Exec with WithAsync; sync uses PrepareBatch, Append, Send and respects Config.InsertTimeout). Tests, mocks, configuration, query column ordering, and tracing were updated to support and observe the sync path.

Changes

Async/Sync Insert Path Separation

Layer / File(s) Summary
ClickHouse Batch mock infrastructure
openmeter/streaming/clickhouse/mock.go
Adds MockBatch and NewMockBatch, implements driver.Batch methods wired to testify/mock, and imports column for Columns() support.
BatchInsert dual-path implementation
openmeter/streaming/clickhouse/connector.go, openmeter/streaming/clickhouse/event_query.go
Adds Config.InsertTimeout and splits BatchInsert: async mode builds InsertEventsQuery and uses Exec with clickhouse.WithAsync(AsyncInsertWait); sync mode applies InsertQuerySettings, prepares an explicit-column INSERT batch, appends event fields per rawEventColumns, and calls Send.
Sync and async insert tests
openmeter/streaming/clickhouse/connector_test.go
Refactors TestBatchInsert into subtests: sync variants mock PrepareBatch and batch lifecycle (Append, Send, Close) including empty-batch and error cases; async variants set Config.AsyncInsert and mock Exec success/error.
Configuration and examples
app/common/streaming.go, app/config/aggregation.go, app/config/config_test.go, app/config/sink.go, config.example.yaml
Adds AggregationConfiguration.InsertTimeout, validates it (>0), sets default aggregation.insertTimeout to 1m, wires InsertTimeout into connector creation, updates config test expectations, removes sink.storage config/type/defaults, and adds aggregation ClickHouse example settings in config.example.yaml.
PrepareBatch tracing wrapper
pkg/framework/clickhouseotel/otel.go
Adds ClickHouseTracer.PrepareBatch and a tracedBatch that records span attributes/status for Append/AppendStruct/Send/Abort/Close.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 28.57% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: refactoring the streaming connector to optimize synchronous batch insert operations by adding timeout handling and distinct insert paths.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/streaming

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@greptile-apps

greptile-apps Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR replaces the synchronous Exec-based insert path in BatchInsert with the ClickHouse native columnar batch API (PrepareBatch / Append / Send), which avoids client-side parameter interpolation and server-side VALUES parsing on large batches. It also adds an InsertTimeout config field to guard against half-open connections hanging a batch insert indefinitely, removes the deprecated StorageConfiguration from sink config, and adds OTel tracing support for the new batch path.

  • Sync path rewritten: BatchInsert now calls PrepareBatch and streams columnar rows rather than embedding values in an INSERT SQL string; rawEventColumns is extracted as a shared slice to keep column order consistent between the two paths.
  • Timeout protection added: InsertTimeout (default 1 m) is applied only when the calling context carries no deadline, preventing indefinite hangs on the sync path; the async path is unchanged.
  • OTel tracing extended: tracedBatch wraps the returned driver.Batch and records errors on the span for Append, AppendStruct, and Send.

Confidence Score: 5/5

Safe to merge; the sync batch rewrite is well-guarded and the async path is untouched.

The core BatchInsert rewrite is correct: the defer for Close is registered only after a successful PrepareBatch, the InsertTimeout fallback fires only when the context has no deadline, and the shared rawEventColumns slice keeps both insert paths in sync. The only inconsistency is that tracedBatch.Abort does not record its error on the span, unlike every other method in that wrapper — but this does not affect data correctness.

pkg/framework/clickhouseotel/otel.go — the Abort method in tracedBatch omits span error recording.

Important Files Changed

Filename Overview
openmeter/streaming/clickhouse/connector.go Rewrites BatchInsert to use native columnar batch API for sync inserts; adds InsertTimeout fallback when context has no deadline; async path unchanged.
pkg/framework/clickhouseotel/otel.go Adds PrepareBatch tracing via a tracedBatch wrapper; Abort path omits span error recording unlike all other batch methods.
app/config/aggregation.go Adds InsertTimeout field with 1m default and >0 validation; wired correctly through to connector Config.
openmeter/streaming/clickhouse/connector_test.go Expands BatchInsert tests to cover sync/async paths, error conditions (PrepareBatch, Append, Send), and empty-batch short-circuit.
openmeter/streaming/clickhouse/event_query.go Extracts rawEventColumns slice shared between InsertEventsQuery.ToSQL and Connector.BatchInsert to prevent column-order drift.
app/config/sink.go Removes deprecated StorageConfiguration and its viper defaults, completing the migration to AggregationConfiguration.
app/common/streaming.go Passes InsertTimeout from AggregationConfiguration to connector Config.
app/config/config_test.go Updates config test to include InsertTimeout in expected AggregationConfiguration; removes StorageConfiguration assertion.
config.example.yaml Adds aggregation section documentation with asyncInsert, asyncInsertWait, insertTimeout, and ClickHouse connection fields.
openmeter/streaming/clickhouse/mock.go Adds MockBatch implementing driver.Batch interface; PrepareBatch already existed in MockClickHouse.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant Connector
    participant ClickHouseConn
    participant Batch

    Caller->>Connector: BatchInsert(ctx, rawEvents)

    alt "AsyncInsert = true"
        Connector->>ClickHouseConn: Exec(ctx+AsyncSettings, INSERT SQL with VALUES)
        ClickHouseConn-->>Connector: error / nil
    else Sync (default)
        Note over Connector: Apply InsertQuerySettings via WithSettings
        Note over Connector: Set InsertTimeout if ctx has no deadline
        Connector->>ClickHouseConn: PrepareBatch(ctx, INSERT INTO table cols)
        ClickHouseConn-->>Connector: Batch
        loop for each RawEvent
            Connector->>Batch: Append(namespace, id, type, ...)
            Batch-->>Connector: error / nil
        end
        Connector->>Batch: Send()
        Batch-->>Connector: error / nil
        Connector->>Batch: Close() deferred
    end

    Connector-->>Caller: error / nil
Loading

Fix All in Claude Code

Reviews (2): Last reviewed commit: "fix: review comments" | Re-trigger Greptile

Comment thread openmeter/streaming/clickhouse/connector.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@openmeter/streaming/clickhouse/connector_test.go`:
- Around line 191-205: The SyncInsert subtest only verifies Append was called
once and misses validating the exact values/order and error branches; update the
test for connector.BatchInsert (using GetMockConnector and NewMockBatch) to
assert PrepareBatch is called with the expected SQL/column list, and assert that
mockBatch.Append is called ten times (or called with the exact slice/values
matching the explicit columns in connector.go) verifying each appended record's
fields and order, then also add separate subtests that exercise and assert
behavior when PrepareBatch, Append, and Send return errors (mock their returns
and require the corresponding error from BatchInsert). Ensure you use
mockCH.AssertExpectations and mockBatch.AssertExpectations to verify these
stricter expectations.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 12929957-eaa7-42cc-8ef7-33b16646573d

📥 Commits

Reviewing files that changed from the base of the PR and between 097ee63 and b7b1463.

📒 Files selected for processing (3)
  • openmeter/streaming/clickhouse/connector.go
  • openmeter/streaming/clickhouse/connector_test.go
  • openmeter/streaming/clickhouse/mock.go

Comment thread openmeter/streaming/clickhouse/connector_test.go
Comment thread openmeter/streaming/clickhouse/connector.go
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note/misc Miscellaneous changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant