Conversation
📝 WalkthroughWalkthroughAdaptive export is rewritten into a node-local operator that applies ClickHouse schema, consumes kubescape events, writes attribution rows, and can drive streaming or passthrough Pixie fan-out. Supporting packages add identity hashing, parsing, storage, trigger, control, and streaming pipeline components. ChangesAdaptive Export Operator
Sequence Diagram(s)sequenceDiagram
participant main
participant Applier
participant Trigger
participant Controller
participant Sink
main->>Applier: Apply schema and verify Pixie tables
main->>Trigger: Subscribe to kubescape rows
Trigger-->>Controller: kubescape.Event
Controller->>Sink: Write attribution row
Controller->>Sink: Write Pixie rows
Estimated review effort🎯 5 (Critical) | ⏱️ ~90+ minutes ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
New env-gated background loop that runs the same PxL shape AE's anomaly-gated path uses, but with an empty Target (no ns/pod predicate) and over a configurable rolling window. Writes via the existing sink so the byte-shape of forensic_db rows is comparable between the PASSTHROUGH=1 phase (EVERYTHING) and the PASSTHROUGH=0 phase (AE-FILTER). One-shot A/B that yields the per-table capture fraction of the adaptive write path. - internal/passthrough/passthrough.go — Loop + Config; defaults to 30s window / 30s refresh / clickhouse.PixieTables() table list. - internal/passthrough/passthrough_test.go — 6 tests; the load-bearing one is TestLoop_EmitsEmptyTargetPxL (asserts neither df.namespace nor df.pod predicates appear in the emitted PxL). - cmd/main.go: ADAPTIVE_PASSTHROUGH + _WINDOW_SEC + _REFRESH_SEC env knobs. Adapter is constructed unconditionally when passthrough is on (joins the existing PushPixie / streaming construction path so the same pxapi grpc stream is reused). Loop is registered with the shutdown WaitGroup so SIGTERM waits for the in-flight tick. - cmd/BUILD.bazel: drop @px// load (other AE BUILD.bazel files use //bazel — sticking out as the only one with @px is a leftover from a prior gazelle run; align). Add passthrough dep.
Stand-alone workflow that builds entlein/dx (private Active-Diagnosis Framework) into ghcr.io/k8sstormcenter/dx-daemon. Separates the dx image publish from the bazel-based vizier_release pipeline; the dx repo ships its own Dockerfile.dxd (Go cross-compile + distroless final stage) so it doesn't need to live as a submodule inside src/vizier/services/dx. Triggers: - tag push 'release/dx/v*' on this repo cuts a release build, image tag derived from the tag suffix (release/dx/v0.1.0 -> image tag 0.1.0). - workflow_dispatch lets us build any dx ref on demand with a custom tag (default: short sha of the resolved dx commit). Pulls dx via DX_ENTLEIN_PAT (already configured on the repo). Multi-arch build (linux/amd64, linux/arm64); Dockerfile.dxd cross-compiles in the native BUILDPLATFORM stage and the final stage is COPY-only, so target emulation isn't required.
The dx image build pipeline lives in entlein/dx itself (PR #53, branch feat/bazel-release): bazel-based with @px external pin to pixie's ae-prod tip, pushes to docker.io/entlein/dx-daemon on a release/dx/v* tag in the dx repo. The pixie-side buildx workflow this reverts duplicated that intent in the wrong repo + the wrong build system (docker buildx instead of bazel + pl_go_image macros) + the wrong registry (ghcr.io/k8sstormcenter instead of docker.io/entlein).
adaptive_export: ADAPTIVE_PASSTHROUGH firehose loop (A/B capture-fraction measurement)
There was a problem hiding this comment.
Actionable comments posted: 15
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In @.github/workflows/vizier_release.yaml:
- Line 18: The workflow uses an unrecognized self-hosted runner label in the
runs-on field ("oracle-vm-16cpu-64gb-x86-64"), which causes actionlint to fail;
either add this exact label to your actionlint self-hosted labels configuration
(the config key that lists allowed self-hosted labels) or change the runs-on
value in the workflow to a label already listed; update the runs-on entry in the
workflow or the actionlint config so that the label
"oracle-vm-16cpu-64gb-x86-64" is accepted by actionlint.
In `@src/vizier/services/adaptive_export/cmd/main.go`:
- Around line 506-514: Replace the raw http.ListenAndServe call with an explicit
http.Server instance configured with sensible timeouts (e.g., ReadTimeout,
ReadHeaderTimeout, WriteTimeout, IdleTimeout) and Addr set from CONTROL_ADDR and
Handler from ctrlSrv.Handler(); start it in the goroutine via
server.ListenAndServe() and keep the existing error handling (compare against
http.ErrServerClosed) and logging behavior for the control.New(activeSet, nil)
control surface. Ensure timeouts are chosen to protect the management surface
from slow clients and retain the existing log.WithField("addr", addr).Info and
log.WithError(err).Error flows.
In `@src/vizier/services/adaptive_export/internal/clickhouse/integration_test.go`:
- Around line 145-152: The test currently reuses a single 30s context
(ctx/cancel) for both Apply and VerifyPixieSchema causing deadline races; change
to create and use separate contexts (with their own timeouts and cancels) for
a.Apply(ctxApply) and a.VerifyPixieSchema(ctxVerify) so Apply cannot exhaust the
timeout needed for VerifyPixieSchema, ensuring you call the corresponding
cancel() after each operation; locate the calls to Apply and VerifyPixieSchema
in the test and replace the shared ctx/cancel usage with distinct contexts for
each call.
In `@src/vizier/services/adaptive_export/internal/clickhouse/schema.sql`:
- Around line 416-430: The ReplacingMergeTree for table adaptive_attribution
uses ORDER BY (hostname, anomaly_hash) which can collapse rows across different
namespace/pod values; either widen the primary key to include namespace and pod
(e.g., change ORDER BY to include namespace, pod) so the merge key is unique per
(hostname, namespace, pod, anomaly_hash) or ensure anomaly_hash is computed to
include namespace and pod; inspect the anomaly hash construction in
src/vizier/services/adaptive_export/internal/anomaly/hash.go (and any function
that builds anomaly_hash) and then update either the CREATE TABLE ORDER BY
clause or the hash construction so the join path (hostname, namespace, pod,
time) cannot lose rows.
In `@src/vizier/services/adaptive_export/internal/control/server_test.go`:
- Around line 115-129: Update TestBadInputRejected to include negative cases
that assert 400 responses for invalid time/window bounds: add do(...) calls that
post to the same endpoints using the test helper do with JSON payloads that
include (1) t_end <= 0 (e.g. {"pod":"p","table":"t","query_id":"x","t_end":0}),
(2) window [0,0] (e.g. {"pod":"p","table":"t","query_id":"x","window":[0,0]}),
and (3) window with end <= start (e.g.
{"pod":"p","table":"t","query_id":"x","window":[10,5]}), and assert each
returned r.StatusCode == http.StatusBadRequest; place these alongside the
existing checks in TestBadInputRejected so request-shape validation is locked by
tests.
In `@src/vizier/services/adaptive_export/internal/control/server.go`:
- Around line 110-159: The three handlers (handleStart, handleStop, handleQuery)
are currently unauthenticated; add an authorization check at the top of each
handler to reject unauthenticated/unauthorized callers before any state change
or query is performed. Implement or call a helper (e.g., s.requireAuth(r, w) or
s.authenticator.Authenticate(r)) that validates credentials (Authorization
header, token, mTLS, or IP restriction to localhost) and returns a boolean; if
it fails write an appropriate 401/403 response and return. Apply this check in
handleStart, handleStop, and handleQuery (before decode/processing and before
using s.set or s.runner) or refactor into a middleware wrapper used by all three
handlers. Ensure unauthorized requests never reach s.set.Upsert, s.set.Remove,
or s.runner.OrderQuery.
- Around line 116-154: The start and query handlers accept invalid times: add
validation in handleStart (after decode of targetReq) to reject default/zero
t_end by checking req.TEnd > 0 and return HTTP 400 if not; and in handleQuery
(after decode of queryReq) validate the Window values before calling
s.runner.OrderQuery—ensure req.Window has length 2, both entries > 0, and
req.Window[0] < req.Window[1], returning HTTP 400 for any violation; keep using
the existing req types (targetReq, queryReq) and the same response codes (400
for bad requests) and only proceed to s.set.Upsert or s.runner.OrderQuery when
validations pass.
In `@src/vizier/services/adaptive_export/internal/controller/controller.go`:
- Around line 26-29: Update the package header comment in controller.go to
reflect current behavior: remove or modify the lines claiming the controller
"does NOT execute PxL itself, does NOT write pixie observation rows, and does
NOT manage retention scripts" and instead state that the controller does execute
PxL and writes Pixie rows via pushPixieRows (and clarify its relationship to
retention scripts if applicable). Make sure to reference the controller package
and the pushPixieRows function in the comment so the contract accurately
describes the implementation.
- Around line 325-347: If c.sink.Write(ctx, []sink.AttributionRow{snapshot})
returns an error, do not call c.cfg.OnAttribution or spawn pushPixieRows: treat
persistence failure as fatal for downstream fan-out. Modify the block around
c.sink.Write so that when err != nil you log the error and skip both the
c.cfg.OnAttribution(snapshot.Namespace, snapshot.Pod, snapshot.TEnd) call and
the goroutine that calls c.pushPixieRows(ctx, snapshot) / manipulates c.inFlight
for hash; only execute OnAttribution and the spawn path when Write succeeds.
- Around line 243-255: Rehydrate restores c.active but never restarts the rev-1
fan-out goroutines; after populating c.active in Rehydrate, and after releasing
c.mu (do not spawn goroutines while holding the lock), detect rev-1 mode (e.g.,
c.mode or c.cfg.Mode) and for each restored row start the fan-out loop by
launching pushPixieRows in a goroutine with the row's AnomalyHash (ensure you
pass the hash as a value to avoid loop-variable capture and do not hold c.mu
while spawning).
In `@src/vizier/services/adaptive_export/internal/passthrough/passthrough.go`:
- Around line 133-147: The external calls l.q.Query(ctx, src) and
l.s.WritePixieRows(ctx, table, rows) must be executed with bounded per-table
contexts instead of the parent ctx to avoid hanging the sweep/tick loop; wrap
each call in a context.WithTimeout (use a sensible timeout value or an existing
field like l.tableTimeout if present) and defer cancel() before calling Query
and before calling WritePixieRows, propagate the new ctx into those calls, and
handle context.DeadlineExceeded as a warning similar to other errors so Run/tick
can continue promptly without blocking the select loop.
- Around line 81-84: The constructor (New) returns &Loop{... cfg: cfg} while
retaining cfg.Tables by reference; make a defensive copy of the slice before
storing it in Loop so external mutation or concurrent changes don't affect the
loop. Locate the New function and the Loop struct, and replace direct assignment
of cfg (or cfg.Tables) with a shallow copy where you allocate a new slice and
copy elements from cfg.Tables into it (use the new slice when setting cfg.Tables
stored in Loop), ensuring both the default-assignment branch (when
len(cfg.Tables)==0) and the caller-provided branch copy the slice before
constructing Loop.
In `@src/vizier/services/adaptive_export/internal/pixieapi/pixieapi.go`:
- Around line 87-89: The direct-mode dial code currently enforces the old
env/address gate and calls WithDisableTLSVerification, so change the direct
connection branch to use WithDirectTLSSkipVerify() instead and remove the
special-case guard that returns an error when opts.VizierAddr contains
"cluster.local" and PX_DISABLE_TLS != "1"; locate the direct-dial path that
inspects opts.VizierAddr and constructs the pxapi client (where
WithDisableTLSVerification is used) and replace that option with
WithDirectTLSSkipVerify(), ensuring other non-direct constructors still use the
existing TLS options.
In `@src/vizier/services/adaptive_export/internal/sink/fastencode.go`:
- Around line 188-195: The comment in the time.Time branch is incorrect about
avoiding an intermediate allocation: x.UTC().Format(...) returns a string which
is then passed to buf.WriteString, so this does allocate; update the comment
near the case time.Time / buf.WriteString / x.UTC().Format(...) to state that
Format returns a string (causing an intermediate allocation) and remove the
misleading claim about AppendFormat, or optionally mention that avoiding
allocation would require using a byte-append approach (e.g., time.AppendFormat)
if desired.
In `@src/vizier/services/adaptive_export/internal/streaming/filter.go`:
- Around line 194-196: When handling the receive from deltaCh in the select,
ensure you call disarm() before returning so any armed timer is stopped and
won't later fire and block on pendingC; update the case handling the
"<-u.deltaCh" branch to call disarm() (the same routine used elsewhere to
stop/cleanup the timer) and then return, guaranteeing the timer goroutine is
cleaned up when ActiveSet shuts down.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 38e6ea52-8b8e-437c-a44f-4fcf35986e67
📒 Files selected for processing (75)
.github/workflows/vizier_release.yamlskaffold/skaffold_vizier.yamlsrc/api/go/pxapi/opts.gosrc/stirling/source_connectors/socket_tracer/testing/container_images/BUILD.bazelsrc/vizier/services/adaptive_export/BUILD.bazelsrc/vizier/services/adaptive_export/cmd/BUILD.bazelsrc/vizier/services/adaptive_export/cmd/main.gosrc/vizier/services/adaptive_export/internal/activeset/BUILD.bazelsrc/vizier/services/adaptive_export/internal/activeset/activeset.gosrc/vizier/services/adaptive_export/internal/activeset/activeset_test.gosrc/vizier/services/adaptive_export/internal/anomaly/BUILD.bazelsrc/vizier/services/adaptive_export/internal/anomaly/hash.gosrc/vizier/services/adaptive_export/internal/anomaly/hash_bench_test.gosrc/vizier/services/adaptive_export/internal/anomaly/hash_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/BUILD.bazelsrc/vizier/services/adaptive_export/internal/clickhouse/apply.gosrc/vizier/services/adaptive_export/internal/clickhouse/apply_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/columns_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/ddl.gosrc/vizier/services/adaptive_export/internal/clickhouse/ddl_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/insert.gosrc/vizier/services/adaptive_export/internal/clickhouse/insert_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/integration_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/schema.sqlsrc/vizier/services/adaptive_export/internal/config/BUILD.bazelsrc/vizier/services/adaptive_export/internal/config/definition.gosrc/vizier/services/adaptive_export/internal/control/BUILD.bazelsrc/vizier/services/adaptive_export/internal/control/server.gosrc/vizier/services/adaptive_export/internal/control/server_test.gosrc/vizier/services/adaptive_export/internal/controller/BUILD.bazelsrc/vizier/services/adaptive_export/internal/controller/controller.gosrc/vizier/services/adaptive_export/internal/controller/controller_test.gosrc/vizier/services/adaptive_export/internal/e2e/BUILD.bazelsrc/vizier/services/adaptive_export/internal/e2e/e2e_test.gosrc/vizier/services/adaptive_export/internal/kubescape/BUILD.bazelsrc/vizier/services/adaptive_export/internal/kubescape/extract.gosrc/vizier/services/adaptive_export/internal/kubescape/extract_test.gosrc/vizier/services/adaptive_export/internal/passthrough/BUILD.bazelsrc/vizier/services/adaptive_export/internal/passthrough/passthrough.gosrc/vizier/services/adaptive_export/internal/passthrough/passthrough_test.gosrc/vizier/services/adaptive_export/internal/pixie/pixie.gosrc/vizier/services/adaptive_export/internal/pixieapi/BUILD.bazelsrc/vizier/services/adaptive_export/internal/pixieapi/pixieapi.gosrc/vizier/services/adaptive_export/internal/pixieapi/pixieapi_test.gosrc/vizier/services/adaptive_export/internal/pxl/BUILD.bazelsrc/vizier/services/adaptive_export/internal/pxl/pxl.gosrc/vizier/services/adaptive_export/internal/pxl/queryfor.gosrc/vizier/services/adaptive_export/internal/pxl/queryfor_bench_test.gosrc/vizier/services/adaptive_export/internal/pxl/queryfor_test.gosrc/vizier/services/adaptive_export/internal/pxl/tables.gosrc/vizier/services/adaptive_export/internal/pxl/tables_test.gosrc/vizier/services/adaptive_export/internal/sink/BUILD.bazelsrc/vizier/services/adaptive_export/internal/sink/clickhouse.gosrc/vizier/services/adaptive_export/internal/sink/clickhouse_test.gosrc/vizier/services/adaptive_export/internal/sink/encode_bench_test.gosrc/vizier/services/adaptive_export/internal/sink/fastencode.gosrc/vizier/services/adaptive_export/internal/sink/fastencode_test.gosrc/vizier/services/adaptive_export/internal/sink/integration_test.gosrc/vizier/services/adaptive_export/internal/streaming/BUILD.bazelsrc/vizier/services/adaptive_export/internal/streaming/filter.gosrc/vizier/services/adaptive_export/internal/streaming/filter_test.gosrc/vizier/services/adaptive_export/internal/streaming/integration_test.gosrc/vizier/services/adaptive_export/internal/streaming/notifier.gosrc/vizier/services/adaptive_export/internal/streaming/notifier_test.gosrc/vizier/services/adaptive_export/internal/streaming/scanner.gosrc/vizier/services/adaptive_export/internal/streaming/scanner_test.gosrc/vizier/services/adaptive_export/internal/streaming/supervisor.gosrc/vizier/services/adaptive_export/internal/streaming/writer.gosrc/vizier/services/adaptive_export/internal/trigger/BUILD.bazelsrc/vizier/services/adaptive_export/internal/trigger/clickhouse.gosrc/vizier/services/adaptive_export/internal/trigger/clickhouse_test.gosrc/vizier/services/adaptive_export/internal/trigger/fingerprint_bench_test.gosrc/vizier/services/adaptive_export/internal/trigger/integration_test.gosrc/vizier/services/adaptive_export/internal/trigger/watermark.gosrc/vizier/services/adaptive_export/internal/trigger/watermark_test.go
💤 Files with no reviewable changes (2)
- src/vizier/services/adaptive_export/internal/config/definition.go
- src/vizier/services/adaptive_export/internal/pxl/pxl.go
| build-release: | ||
| name: Build Release | ||
| runs-on: oracle-16cpu-64gb-x86-64 | ||
| runs-on: oracle-vm-16cpu-64gb-x86-64 |
There was a problem hiding this comment.
Unknown runner label currently fails workflow linting.
oracle-vm-16cpu-64gb-x86-64 is not recognized by the current actionlint config, so CI fails pre-merge. Add this label to actionlint’s configured self-hosted labels (or revert to a configured label).
Also applies to: 143-143
🧰 Tools
🪛 actionlint (1.7.12)
[error] 18-18: label "oracle-vm-16cpu-64gb-x86-64" is unknown. available labels are "windows-latest", "windows-latest-8-cores", "windows-2025", "windows-2025-vs2026", "windows-2022", "windows-11-arm", "ubuntu-slim", "ubuntu-latest", "ubuntu-latest-4-cores", "ubuntu-latest-8-cores", "ubuntu-latest-16-cores", "ubuntu-24.04", "ubuntu-24.04-arm", "ubuntu-22.04", "ubuntu-22.04-arm", "macos-latest", "macos-latest-xlarge", "macos-latest-large", "macos-26-intel", "macos-26-xlarge", "macos-26-large", "macos-26", "macos-15-intel", "macos-15-xlarge", "macos-15-large", "macos-15", "macos-14-xlarge", "macos-14-large", "macos-14", "self-hosted", "x64", "arm", "arm64", "linux", "macos", "windows". if it is a custom label for self-hosted runner, set list of labels in actionlint.yaml config file
(runner-label)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In @.github/workflows/vizier_release.yaml at line 18, The workflow uses an
unrecognized self-hosted runner label in the runs-on field
("oracle-vm-16cpu-64gb-x86-64"), which causes actionlint to fail; either add
this exact label to your actionlint self-hosted labels configuration (the config
key that lists allowed self-hosted labels) or change the runs-on value in the
workflow to a label already listed; update the runs-on entry in the workflow or
the actionlint config so that the label "oracle-vm-16cpu-64gb-x86-64" is
accepted by actionlint.
Source: Linters/SAST tools
| if addr := os.Getenv("CONTROL_ADDR"); addr != "" { | ||
| ctrlSrv := control.New(activeSet, nil) // OrderQuery runner wired later | ||
| go func() { | ||
| log.WithField("addr", addr).Info("control surface listening") | ||
| if err := http.ListenAndServe(addr, ctrlSrv.Handler()); err != nil && | ||
| err != http.ErrServerClosed { | ||
| log.WithError(err).Error("control surface stopped") | ||
| } | ||
| }() |
There was a problem hiding this comment.
Use an http.Server with explicit timeouts for the control listener.
The raw http.ListenAndServe path has no header/read/write/idle timeouts, so a slow client can hold resources indefinitely on this management surface.
💡 Suggested hardening
if addr := os.Getenv("CONTROL_ADDR"); addr != "" {
ctrlSrv := control.New(activeSet, nil) // OrderQuery runner wired later
+ httpSrv := &http.Server{
+ Addr: addr,
+ Handler: ctrlSrv.Handler(),
+ ReadHeaderTimeout: 5 * time.Second,
+ ReadTimeout: 15 * time.Second,
+ WriteTimeout: 30 * time.Second,
+ IdleTimeout: 60 * time.Second,
+ }
go func() {
log.WithField("addr", addr).Info("control surface listening")
- if err := http.ListenAndServe(addr, ctrlSrv.Handler()); err != nil &&
+ if err := httpSrv.ListenAndServe(); err != nil &&
err != http.ErrServerClosed {
log.WithError(err).Error("control surface stopped")
}
}()
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if addr := os.Getenv("CONTROL_ADDR"); addr != "" { | |
| ctrlSrv := control.New(activeSet, nil) // OrderQuery runner wired later | |
| go func() { | |
| log.WithField("addr", addr).Info("control surface listening") | |
| if err := http.ListenAndServe(addr, ctrlSrv.Handler()); err != nil && | |
| err != http.ErrServerClosed { | |
| log.WithError(err).Error("control surface stopped") | |
| } | |
| }() | |
| if addr := os.Getenv("CONTROL_ADDR"); addr != "" { | |
| ctrlSrv := control.New(activeSet, nil) // OrderQuery runner wired later | |
| httpSrv := &http.Server{ | |
| Addr: addr, | |
| Handler: ctrlSrv.Handler(), | |
| ReadHeaderTimeout: 5 * time.Second, | |
| ReadTimeout: 15 * time.Second, | |
| WriteTimeout: 30 * time.Second, | |
| IdleTimeout: 60 * time.Second, | |
| } | |
| go func() { | |
| log.WithField("addr", addr).Info("control surface listening") | |
| if err := httpSrv.ListenAndServe(); err != nil && | |
| err != http.ErrServerClosed { | |
| log.WithError(err).Error("control surface stopped") | |
| } | |
| }() | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/cmd/main.go` around lines 506 - 514,
Replace the raw http.ListenAndServe call with an explicit http.Server instance
configured with sensible timeouts (e.g., ReadTimeout, ReadHeaderTimeout,
WriteTimeout, IdleTimeout) and Addr set from CONTROL_ADDR and Handler from
ctrlSrv.Handler(); start it in the goroutine via server.ListenAndServe() and
keep the existing error handling (compare against http.ErrServerClosed) and
logging behavior for the control.New(activeSet, nil) control surface. Ensure
timeouts are chosen to protect the management surface from slow clients and
retain the existing log.WithField("addr", addr).Info and
log.WithError(err).Error flows.
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
| defer cancel() | ||
| // Apply first so the test is order-independent w.r.t. TestApply_Live. | ||
| if err := a.Apply(ctx); err != nil { | ||
| t.Fatalf("Apply (precondition): %v", err) | ||
| } | ||
| if err := a.VerifyPixieSchema(ctx); err != nil { | ||
| t.Fatalf("VerifyPixieSchema: %v", err) |
There was a problem hiding this comment.
Use separate contexts for Apply and VerifyPixieSchema in the live verify test.
At Line 145, a single 30s timeout is shared across both operations. On slower clusters, Apply can consume most/all budget and make VerifyPixieSchema fail with deadline errors unrelated to schema correctness.
Suggested fix
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
- defer cancel()
- // Apply first so the test is order-independent w.r.t. TestApply_Live.
- if err := a.Apply(ctx); err != nil {
+ ctxApply, cancelApply := context.WithTimeout(context.Background(), 60*time.Second)
+ defer cancelApply()
+ // Apply first so the test is order-independent w.r.t. TestApply_Live.
+ if err := a.Apply(ctxApply); err != nil {
t.Fatalf("Apply (precondition): %v", err)
}
- if err := a.VerifyPixieSchema(ctx); err != nil {
+ ctxVerify, cancelVerify := context.WithTimeout(context.Background(), 60*time.Second)
+ defer cancelVerify()
+ if err := a.VerifyPixieSchema(ctxVerify); err != nil {
t.Fatalf("VerifyPixieSchema: %v", err)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
| defer cancel() | |
| // Apply first so the test is order-independent w.r.t. TestApply_Live. | |
| if err := a.Apply(ctx); err != nil { | |
| t.Fatalf("Apply (precondition): %v", err) | |
| } | |
| if err := a.VerifyPixieSchema(ctx); err != nil { | |
| t.Fatalf("VerifyPixieSchema: %v", err) | |
| ctxApply, cancelApply := context.WithTimeout(context.Background(), 60*time.Second) | |
| defer cancelApply() | |
| // Apply first so the test is order-independent w.r.t. TestApply_Live. | |
| if err := a.Apply(ctxApply); err != nil { | |
| t.Fatalf("Apply (precondition): %v", err) | |
| } | |
| ctxVerify, cancelVerify := context.WithTimeout(context.Background(), 60*time.Second) | |
| defer cancelVerify() | |
| if err := a.VerifyPixieSchema(ctxVerify); err != nil { | |
| t.Fatalf("VerifyPixieSchema: %v", err) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/clickhouse/integration_test.go`
around lines 145 - 152, The test currently reuses a single 30s context
(ctx/cancel) for both Apply and VerifyPixieSchema causing deadline races; change
to create and use separate contexts (with their own timeouts and cancels) for
a.Apply(ctxApply) and a.VerifyPixieSchema(ctxVerify) so Apply cannot exhaust the
timeout needed for VerifyPixieSchema, ensuring you call the corresponding
cancel() after each operation; locate the calls to Apply and VerifyPixieSchema
in the test and replace the shared ctx/cancel usage with distinct contexts for
each call.
| CREATE TABLE IF NOT EXISTS forensic_db.adaptive_attribution ( | ||
| anomaly_hash String, | ||
| namespace String, | ||
| pod String, | ||
| comm String, | ||
| pid UInt64, | ||
| hostname String, | ||
| t_start DateTime64(9, 'UTC'), | ||
| t_end DateTime64(9, 'UTC'), | ||
| last_seen DateTime64(9, 'UTC'), | ||
| last_rule_id String, | ||
| n_anomalies UInt64 | ||
| ) ENGINE = ReplacingMergeTree(t_end) | ||
| PARTITION BY toYYYYMM(t_start) | ||
| ORDER BY (hostname, anomaly_hash); |
There was a problem hiding this comment.
❓ Verification inconclusive
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n -C3 'anomaly_hash|namespace|pod|hostname|hash' \
src/vizier/services/adaptive_export/internal/anomaly/hash.go \
src/vizier/services/adaptive_export/internal/anomaly/hash_test.go \
src/vizier/services/adaptive_export/internal/controller/controller.go \
src/vizier/services/adaptive_export/internal/kubescape/extract.go
rg -n -C2 'CREATE TABLE IF NOT EXISTS forensic_db.adaptive_attribution|ORDER BY' \
src/vizier/services/adaptive_export/internal/clickhouse/schema.sqlRepository: k8sstormcenter/pixie
Length of output: 1957
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n -C3 'anomaly_hash|namespace|pod|hostname|hash' \
src/vizier/services/adaptive_export/internal/anomaly/hash.go \
src/vizier/services/adaptive_export/internal/anomaly/hash_test.go \
src/vizier/services/adaptive_export/internal/controller/controller.go \
src/vizier/services/adaptive_export/internal/kubescape/extract.go
rg -n -C2 'CREATE TABLE IF NOT EXISTS forensic_db.adaptive_attribution|ORDER BY' \
src/vizier/services/adaptive_export/internal/clickhouse/schema.sqlRepository: k8sstormcenter/pixie
Length of output: 1928
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n -C3 'anomaly_hash|namespace|pod|hostname|hash' \
src/vizier/services/adaptive_export/internal/anomaly/hash.go \
src/vizier/services/adaptive_export/internal/anomaly/hash_test.go \
src/vizier/services/adaptive_export/internal/controller/controller.go \
src/vizier/services/adaptive_export/internal/kubescape/extract.go
rg -n -C2 'CREATE TABLE IF NOT EXISTS forensic_db.adaptive_attribution|ORDER BY' \
src/vizier/services/adaptive_export/internal/clickhouse/schema.sqlRepository: k8sstormcenter/pixie
Length of output: 1928
adaptive_attribution ReplacingMergeTree ORDER BY key may be too narrow for the join dimensions.
With ENGINE = ReplacingMergeTree(t_end) and ORDER BY (hostname, anomaly_hash), rows can overwrite/collapse across different namespace/pod values on the same hostname when anomaly_hash is not already namespace/pod-scoped—risking losing attribution rows needed by the (hostname, namespace, pod, time_) join path.
Proposed schema key change
) ENGINE = ReplacingMergeTree(t_end)
PARTITION BY toYYYYMM(t_start)
- ORDER BY (hostname, anomaly_hash);
+ ORDER BY (hostname, namespace, pod, anomaly_hash);Inspect how anomaly_hash is constructed/what fields it includes (e.g., in src/vizier/services/adaptive_export/internal/anomaly/hash.go). If namespace and/or pod are not inputs to anomaly_hash, the current ORDER BY (hostname, anomaly_hash) key is unsafe and should be widened as proposed.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/clickhouse/schema.sql` around
lines 416 - 430, The ReplacingMergeTree for table adaptive_attribution uses
ORDER BY (hostname, anomaly_hash) which can collapse rows across different
namespace/pod values; either widen the primary key to include namespace and pod
(e.g., change ORDER BY to include namespace, pod) so the merge key is unique per
(hostname, namespace, pod, anomaly_hash) or ensure anomaly_hash is computed to
include namespace and pod; inspect the anomaly hash construction in
src/vizier/services/adaptive_export/internal/anomaly/hash.go (and any function
that builds anomaly_hash) and then update either the CREATE TABLE ORDER BY
clause or the hash construction so the join path (hostname, namespace, pod,
time) cannot lose rows.
| func TestBadInputRejected(t *testing.T) { | ||
| srv := New(&fakeExporter{}, &fakeRunner{}) | ||
| // missing pod | ||
| if r := do(t, srv, http.MethodPost, "/export/start", `{"namespace":"n"}`); r.StatusCode != http.StatusBadRequest { | ||
| t.Fatalf("start no-pod = %d, want 400", r.StatusCode) | ||
| } | ||
| // malformed json | ||
| if r := do(t, srv, http.MethodPost, "/export/stop", `{not json`); r.StatusCode != http.StatusBadRequest { | ||
| t.Fatalf("stop bad-json = %d, want 400", r.StatusCode) | ||
| } | ||
| // query missing table | ||
| if r := do(t, srv, http.MethodPost, "/query", `{"pod":"p","query_id":"x","window":[1,2]}`); r.StatusCode != http.StatusBadRequest { | ||
| t.Fatalf("query no-table = %d, want 400", r.StatusCode) | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Extend bad-input tests for t_end and window bounds.
Please add negative cases for t_end<=0, window=[0,0], and window with end<=start so request-shape validation is locked in by tests.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/control/server_test.go` around
lines 115 - 129, Update TestBadInputRejected to include negative cases that
assert 400 responses for invalid time/window bounds: add do(...) calls that post
to the same endpoints using the test helper do with JSON payloads that include
(1) t_end <= 0 (e.g. {"pod":"p","table":"t","query_id":"x","t_end":0}), (2)
window [0,0] (e.g. {"pod":"p","table":"t","query_id":"x","window":[0,0]}), and
(3) window with end <= start (e.g.
{"pod":"p","table":"t","query_id":"x","window":[10,5]}), and assert each
returned r.StatusCode == http.StatusBadRequest; place these alongside the
existing checks in TestBadInputRejected so request-shape validation is locked by
tests.
| if len(cfg.Tables) == 0 { | ||
| cfg.Tables = clickhouse.PixieTables() | ||
| } | ||
| return &Loop{q: q, s: s, cfg: cfg} |
There was a problem hiding this comment.
Defensively copy cfg.Tables in New.
On Line 81-Line 84, cfg.Tables is retained by reference when provided by caller. Post-construction mutation of the original slice can change loop behavior (or race under concurrent mutation). Copy it before storing in Loop.
Suggested fix
if len(cfg.Tables) == 0 {
cfg.Tables = clickhouse.PixieTables()
}
+ cfg.Tables = append([]string(nil), cfg.Tables...)
return &Loop{q: q, s: s, cfg: cfg}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if len(cfg.Tables) == 0 { | |
| cfg.Tables = clickhouse.PixieTables() | |
| } | |
| return &Loop{q: q, s: s, cfg: cfg} | |
| if len(cfg.Tables) == 0 { | |
| cfg.Tables = clickhouse.PixieTables() | |
| } | |
| cfg.Tables = append([]string(nil), cfg.Tables...) | |
| return &Loop{q: q, s: s, cfg: cfg} |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/passthrough/passthrough.go`
around lines 81 - 84, The constructor (New) returns &Loop{... cfg: cfg} while
retaining cfg.Tables by reference; make a defensive copy of the slice before
storing it in Loop so external mutation or concurrent changes don't affect the
loop. Locate the New function and the Loop struct, and replace direct assignment
of cfg (or cfg.Tables) with a shallow copy where you allocate a new slice and
copy elements from cfg.Tables into it (use the new slice when setting cfg.Tables
stored in Loop), ensuring both the default-assignment branch (when
len(cfg.Tables)==0) and the caller-provided branch copy the slice before
constructing Loop.
| rows, err := l.q.Query(ctx, src) | ||
| if err != nil { | ||
| log.WithError(err).WithField("table", table).Warn("ADAPTIVE_PASSTHROUGH: pixie query failed") | ||
| continue | ||
| } | ||
| if len(rows) == 0 { | ||
| log.WithField("table", table).Debug("ADAPTIVE_PASSTHROUGH: 0 rows") | ||
| continue | ||
| } | ||
| if err := l.s.WritePixieRows(ctx, table, rows); err != nil { | ||
| log.WithError(err).WithFields(log.Fields{ | ||
| "table": table, | ||
| "rows": len(rows), | ||
| }).Warn("ADAPTIVE_PASSTHROUGH: sink write failed") | ||
| continue |
There was a problem hiding this comment.
Bound external query/write calls with per-table timeouts.
On Line 133 and Line 142, external calls run with the parent context only. A hung dependency call can stall the whole sweep and delay shutdown because Run cannot re-enter the select loop until tick returns. Add bounded contexts per table operation.
Suggested fix
@@
- rows, err := l.q.Query(ctx, src)
+ tableCtx, cancel := context.WithTimeout(ctx, l.cfg.Refresh)
+ rows, err := l.q.Query(tableCtx, src)
+ cancel()
if err != nil {
log.WithError(err).WithField("table", table).Warn("ADAPTIVE_PASSTHROUGH: pixie query failed")
continue
}
@@
- if err := l.s.WritePixieRows(ctx, table, rows); err != nil {
+ writeCtx, cancel := context.WithTimeout(ctx, l.cfg.Refresh)
+ err = l.s.WritePixieRows(writeCtx, table, rows)
+ cancel()
+ if err != nil {
log.WithError(err).WithFields(log.Fields{
"table": table,
"rows": len(rows),
}).Warn("ADAPTIVE_PASSTHROUGH: sink write failed")
continue
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/passthrough/passthrough.go`
around lines 133 - 147, The external calls l.q.Query(ctx, src) and
l.s.WritePixieRows(ctx, table, rows) must be executed with bounded per-table
contexts instead of the parent ctx to avoid hanging the sweep/tick loop; wrap
each call in a context.WithTimeout (use a sensible timeout value or an existing
field like l.tableTimeout if present) and defer cancel() before calling Query
and before calling WritePixieRows, propagate the new ctx into those calls, and
handle context.DeadlineExceeded as a warning similar to other errors so Run/tick
can continue promptly without blocking the select loop.
| if strings.Contains(opts.VizierAddr, "cluster.local") && os.Getenv("PX_DISABLE_TLS") != "1" { | ||
| return nil, errors.New("pixieapi: PX_DISABLE_TLS=1 required for direct cluster.local connections (pxapi's TLS-skip is gated on that env)") | ||
| } |
There was a problem hiding this comment.
Direct-mode dial still uses the wrong TLS-skip option.
This path still depends on WithDisableTLSVerification(...), so skip-verify remains env/address-gated and does not align with the new direct dial contract. Use WithDirectTLSSkipVerify() in direct mode and remove the constructor gate that only exists to avoid the old WithDisableTLSVerification behavior.
🔧 Proposed fix
- if strings.Contains(opts.VizierAddr, "cluster.local") && os.Getenv("PX_DISABLE_TLS") != "1" {
- return nil, errors.New("pixieapi: PX_DISABLE_TLS=1 required for direct cluster.local connections (pxapi's TLS-skip is gated on that env)")
- }
return &Adapter{clusterID: clusterID, directOpts: &opts}, nil c, err := pxapi.NewClient(ctx,
pxapi.WithCloudAddr(a.directOpts.VizierAddr),
- pxapi.WithDisableTLSVerification(a.directOpts.VizierAddr),
+ pxapi.WithDirectTLSSkipVerify(),
pxapi.WithBearerAuth(jwt),
)#!/bin/bash
# Verify direct-mode still uses env-gated TLS skip instead of the new direct option.
rg -n --type=go 'WithDisableTLSVerification|WithDirectTLSSkipVerify' \
src/vizier/services/adaptive_export/internal/pixieapi/pixieapi.go \
src/api/go/pxapi/opts.go -C2Also applies to: 140-143
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/pixieapi/pixieapi.go` around
lines 87 - 89, The direct-mode dial code currently enforces the old env/address
gate and calls WithDisableTLSVerification, so change the direct connection
branch to use WithDirectTLSSkipVerify() instead and remove the special-case
guard that returns an error when opts.VizierAddr contains "cluster.local" and
PX_DISABLE_TLS != "1"; locate the direct-dial path that inspects opts.VizierAddr
and constructs the pxapi client (where WithDisableTLSVerification is used) and
replace that option with WithDirectTLSSkipVerify(), ensuring other non-direct
constructors still use the existing TLS options.
| case time.Time: | ||
| // Same format normalisePixieValue uses for the encoding/json | ||
| // path — CH DateTime64 string input shape. | ||
| buf.WriteByte('"') | ||
| // AppendFormat reuses the buf's underlying bytes; no | ||
| // intermediate string allocation. | ||
| buf.WriteString(x.UTC().Format("2006-01-02 15:04:05.000000000")) | ||
| buf.WriteByte('"') |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Minor comment inaccuracy about string allocation.
The comment on lines 192-193 states "AppendFormat reuses the buf's underlying bytes; no intermediate string allocation" but the code calls x.UTC().Format(...) which returns a string, then passes it to WriteString. This allocates a string intermediate.
time.Time doesn't have an AppendFormat method, so using Format is the correct approach here, but the comment is misleading.
📝 Suggested comment correction
case time.Time:
// Same format normalisePixieValue uses for the encoding/json
// path — CH DateTime64 string input shape.
buf.WriteByte('"')
- // AppendFormat reuses the buf's underlying bytes; no
- // intermediate string allocation.
+ // Format() allocates a string; time.Time has no AppendFormat method.
buf.WriteString(x.UTC().Format("2006-01-02 15:04:05.000000000"))
buf.WriteByte('"')📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case time.Time: | |
| // Same format normalisePixieValue uses for the encoding/json | |
| // path — CH DateTime64 string input shape. | |
| buf.WriteByte('"') | |
| // AppendFormat reuses the buf's underlying bytes; no | |
| // intermediate string allocation. | |
| buf.WriteString(x.UTC().Format("2006-01-02 15:04:05.000000000")) | |
| buf.WriteByte('"') | |
| case time.Time: | |
| // Same format normalisePixieValue uses for the encoding/json | |
| // path — CH DateTime64 string input shape. | |
| buf.WriteByte('"') | |
| // Format() allocates a string; time.Time has no AppendFormat method. | |
| buf.WriteString(x.UTC().Format("2006-01-02 15:04:05.000000000")) | |
| buf.WriteByte('"') |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/sink/fastencode.go` around lines
188 - 195, The comment in the time.Time branch is incorrect about avoiding an
intermediate allocation: x.UTC().Format(...) returns a string which is then
passed to buf.WriteString, so this does allocate; update the comment near the
case time.Time / buf.WriteString / x.UTC().Format(...) to state that Format
returns a string (causing an intermediate allocation) and remove the misleading
claim about AppendFormat, or optionally mention that avoiding allocation would
require using a byte-append approach (e.g., time.AppendFormat) if desired.
| case _, ok := <-u.deltaCh: | ||
| if !ok { | ||
| return |
There was a problem hiding this comment.
Timer leak on deltaCh close.
When deltaCh is closed (ActiveSet shutdown), the function returns without calling disarm(). If a timer is armed, it will fire and block trying to send on pendingC, leaking the timer's goroutine.
🔒 Proposed fix
case _, ok := <-u.deltaCh:
if !ok {
+ disarm()
return
}
arm()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case _, ok := <-u.deltaCh: | |
| if !ok { | |
| return | |
| case _, ok := <-u.deltaCh: | |
| if !ok { | |
| disarm() | |
| return | |
| } | |
| arm() |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/streaming/filter.go` around
lines 194 - 196, When handling the receive from deltaCh in the select, ensure
you call disarm() before returning so any armed timer is stopped and won't later
fire and block on pendingC; update the case handling the "<-u.deltaCh" branch to
call disarm() (the same routine used elsewhere to stop/cleanup the timer) and
then return, guaranteeing the timer goroutine is cleaned up when ActiveSet shuts
down.
|
Summary: Productionise the adaptive-export (AE) service on the k8sstormcenter fork: streaming export of pixie protocol tables into forensic_db, write-integrity assertions, schema rev-2 (anomaly_hashes on every protocol table), conn_stats persistence, ADAPTIVE_PASSTHROUGH (firehose) for A/B capture-fraction measurement, and the supporting CI + release-pipeline cleanup (copybara fork-ignore, ghcr migration, perf workflows).
Test Plan: 1) go test ./src/vizier/services/adaptive_export/... — all unit + integration tests green (controller, sink, trigger, pxl, anomaly, kubescape, window, clickhouse, passthrough). 2) local-ci.sh phase 9 (perf-eval-soc-attack on local k3s + AOCC pixie-cloud) — passed 6/6, 698KB parquet, 43,944 redis_events rows, zero PEM/Kelvin/query-broker restarts. 3) Release pipeline: cut release/vizier/v0.14.19-aeprod-clean3 against ef7525c — vizier-release workflow GREEN end-to-end (Build Release 44m, Create Release on Github, SBOM upload), multi-arch ghcr.io/k8sstormcenter/vizier-adaptive_export_image:0.14.19-aeprod-clean3 published.
Type of change: /kind feature
Changes spanning this PR (33 commits on top of main):
2db5a9e62); ADAPTIVE_PASSTHROUGH firehose for A/B capture-fraction (45130be54); schema rev-2 with anomaly_hashes column and conn_stats persistence; per-table fault isolation; configurable rehydrate timeout.