From ff094843eb493140555f213adb77c0375aca3aa3 Mon Sep 17 00:00:00 2001 From: Lukas Wuttke Date: Tue, 2 Jun 2026 10:46:16 +0200 Subject: [PATCH] feat(dataset push): support the tabular / time-series modality family Extends `dataset push` from image_classification-only to also cover tabular_classification, tabular_regression, time_series_forecasting, and time_to_event_prediction. The Python ingestor already supports these; this adds the CLI-side flag / layout / spec surface. Validated end-to-end on a live cluster (tabular_classification, 8/8 records, 100%, rows confirmed in MySQL). - Category dispatch (push/category.go): image vs tabular families, mirroring data-ingestors' conventions.py groupings. - Tabular local layout (push/tabular.go): a single CSV (no sidecar files), staged via the existing machinery (CSV + empty image list). - Schema: auto-inferred from the CSV (INT/FLOAT/VARCHAR) so customers don't hand-write one; --schema col:TYPE,... overrides. Reserved framework columns (id, data_id, ...) are skipped so a CSV carrying an id column doesn't produce a schema the ingestor rejects (the #135b guard). - Label: string form for tabular_classification; object form with policy=bucket (default) for the regression-class categories so the raw numeric target never leaves the cluster. Added --label-policy and --time-column. - Build() branches by category (push/spec.go); pre-flight is category-aware (data CSV + column count for tabular). Tests: push/tabular_test.go (DiscoverTabular, InferSchema incl. reserved-skip, ParseSchema); spec_test.go (tabular Build passes the schema for all three label shapes, regression defaults to bucket); updated the unsupported-category gate test. go build / vet / test green. Stacked on cli#12 (the dataset-push live-ingestion fixes). Co-Authored-By: Claude Opus 4.8 --- internal/cli/dataset.go | 195 ++++++++++++++++++++--------- internal/cli/dataset_test.go | 20 +-- internal/push/category.go | 51 ++++++++ internal/push/spec.go | 81 ++++++++++-- internal/push/spec_test.go | 78 ++++++++++++ internal/push/tabular.go | 229 ++++++++++++++++++++++++++++++++++ internal/push/tabular_test.go | 153 +++++++++++++++++++++++ 7 files changed, 728 insertions(+), 79 deletions(-) create mode 100644 internal/push/category.go create mode 100644 internal/push/tabular.go create mode 100644 internal/push/tabular_test.go diff --git a/internal/cli/dataset.go b/internal/cli/dataset.go index eb97ea2..23e9b9e 100644 --- a/internal/cli/dataset.go +++ b/internal/cli/dataset.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "path/filepath" + "strings" "github.com/spf13/cobra" "gopkg.in/yaml.v3" @@ -68,14 +69,17 @@ func newDatasetPushCmd() *cobra.Command { contextOverride string nsOverride string - // Ingest-spec flags. The set is intentionally - // image_classification-only for v0.1 per epic #147 - // non-goals; other categories are one-PR additions in v0.2. + // Ingest-spec flags. image_classification + the tabular / + // time-series family are supported today; text + detection + + // segmentation land in later increments. table string category string intent string labelColumn string targetSize string + schemaFlag string + labelPolicy string + timeColumn string // Operations flags. dryRun bool @@ -142,12 +146,16 @@ Exit codes: RunE: func(cmd *cobra.Command, args []string) error { return runDatasetPush(cmd.Context(), cmd.OutOrStdout(), cmd.ErrOrStderr(), runDatasetPushArgs{ - LocalPath: args[0], - Kubeconfig: kubeconfigPath, - Context: contextOverride, - Namespace: nsOverride, - Spec: push.SpecArgs{Table: table, Category: category, Intent: intent, LabelColumn: labelColumn}, + LocalPath: args[0], + Kubeconfig: kubeconfigPath, + Context: contextOverride, + Namespace: nsOverride, + Spec: push.SpecArgs{ + Table: table, Category: category, Intent: intent, + LabelColumn: labelColumn, LabelPolicy: labelPolicy, TimeColumn: timeColumn, + }, TargetSizeFlag: targetSize, + SchemaFlag: schemaFlag, DryRun: dryRun, IngestorSAName: ingestorSAName, StagePodImage: stagePodImage, @@ -173,14 +181,23 @@ Exit codes: cmd.Flags().StringVar(&table, "table", "", "destination table name (MySQL identifier; matches /data/shared// on the PVC)") cmd.Flags().StringVar(&category, "category", "image_classification", - "task category (v0.1 only supports image_classification; see tracebloc/client#147 non-goals)") + "task category: image_classification, tabular_classification, tabular_regression, "+ + "time_series_forecasting, time_to_event_prediction") cmd.Flags().StringVar(&intent, "intent", "", "intent: train|test") cmd.Flags().StringVar(&labelColumn, "label-column", "", - "column name in labels.csv that holds the label") + "name of the label/target column (in labels.csv for image categories, in the data CSV for tabular)") cmd.Flags().StringVar(&targetSize, "target-size", "", - "image resolution as WxH (e.g. 512x512). Default: auto-detected from the first image. "+ + "image categories only: resolution as WxH (e.g. 512x512). Default: auto-detected from the first image. "+ "All images must share this resolution — the ingestor validates it, it does not resize.") + cmd.Flags().StringVar(&schemaFlag, "schema", "", + "tabular/time-series only: column types as col:TYPE,col:TYPE (e.g. age:INT,price:FLOAT). "+ + "Default: inferred from the CSV (INT/FLOAT/VARCHAR).") + cmd.Flags().StringVar(&labelPolicy, "label-policy", "", + "regression-class only (tabular_regression, time_series_forecasting, time_to_event_prediction): "+ + "passthrough|bucket (default bucket — bins the target so the raw value never leaves the cluster)") + cmd.Flags().StringVar(&timeColumn, "time-column", "", + "time_to_event_prediction only: name of the time/duration column (default: a column named \"time\")") cmd.Flags().BoolVar(&dryRun, "dry-run", false, "validate + discover + walk, but don't create any cluster resources") @@ -215,7 +232,8 @@ type runDatasetPushArgs struct { Context string Namespace string Spec push.SpecArgs - TargetSizeFlag string // raw --target-size; resolved after Discover + TargetSizeFlag string // raw --target-size; resolved after Discover (image) + SchemaFlag string // raw --schema; resolved or inferred after Discover (tabular) DryRun bool IngestorSAName string StagePodImage string @@ -249,56 +267,103 @@ func runDatasetPush(ctx context.Context, out, errOut io.Writer, a runDatasetPush return &exitError{code: 2, err: err} } - // 2. v0.1 category gate. Runs BEFORE schema validation because - // schema-valid-but-unsupported categories (e.g. - // tabular_classification) would otherwise fail with the - // schema's "missing property 'schema'" message — confusing - // for the customer who has no way to set --schema in v0.1. - // Nonsense categories (typos) also hit this gate; the - // "only image_classification in v0.1" message is more - // actionable than the schema's 11-option enum list anyway. - // Bugbot's review-on-self caught the missing gate on PR-a. - if a.Spec.Category != "" && a.Spec.Category != "image_classification" { + // 2. Category gate. Runs BEFORE schema validation so an + // unsupported category gets a clear, actionable CLI message + // rather than the schema's terse enum / missing-property error. + // Supported today: image_classification + the tabular / + // time-series family. The other image categories need sidecar + // (annotation/mask) staging the CLI doesn't do yet, and the + // text family needs a texts/sequences dir — both land in later + // increments. A typo'd category also lands here with a clear + // list rather than the schema's 11-option enum dump. + switch { + case a.Spec.Category == "": + // Left empty by a caller; let the schema produce the canonical + // "category is required" error downstream. + case push.IsTabular(a.Spec.Category) || a.Spec.Category == "image_classification": + // supported + case push.IsImage(a.Spec.Category): return &exitError{code: 2, err: fmt.Errorf( - "category %q is not supported in v0.1 (only image_classification). "+ - "Other categories are one-PR additions in v0.2 — see "+ - "tracebloc/client#147 non-goals.", a.Spec.Category)} + "category %q isn't supported by the CLI yet — it needs annotation/mask "+ + "sidecar staging that's coming in a later release. Supported image "+ + "category: image_classification.", a.Spec.Category)} + default: + return &exitError{code: 2, err: fmt.Errorf( + "category %q isn't supported by the CLI yet. Supported: image_classification, "+ + "tabular_classification, tabular_regression, time_series_forecasting, "+ + "time_to_event_prediction. (Text / detection / segmentation are coming; "+ + "use the helm flow for those meanwhile.)", a.Spec.Category)} } - // 3. Walk the local directory FIRST. Enforces layout + size caps, - // and gives us the image list the target-size auto-detect below - // needs. Both this and the schema check are local "fail fast" - // steps; doing the walk first lets the synthesized spec carry - // the resolved target_size. - layout, err := push.Discover(a.LocalPath) + // 3. Walk the local directory FIRST (local "fail fast"), dispatched + // by category family. Image categories expect labels.csv + + // images/; tabular / time-series categories expect a single + // data CSV. The walk also yields what the per-category + // resolution below needs (the image list for target-size, the + // CSV for schema inference). + var ( + layout *push.LocalLayout + err error + ) + if push.IsTabular(a.Spec.Category) { + layout, err = push.DiscoverTabular(a.LocalPath) + } else { + layout, err = push.Discover(a.LocalPath) + } if err != nil { return &exitError{code: 3, err: err} } - // 3a. Resolve the image target resolution. The ingestor's - // image_classification default is 512x512 and it VALIDATES - // (it does not resize), so a mismatch hard-fails the run with - // an "incorrect resolution" error. Honour an explicit - // --target-size; otherwise auto-detect from the first image so - // the common "all my images are NxN" case just works without - // the customer needing to know the knob exists. - if a.TargetSizeFlag != "" { - w, h, perr := push.ParseTargetSize(a.TargetSizeFlag) - if perr != nil { - return &exitError{code: 2, err: perr} + // 3a. Per-category spec resolution from the local data, so the + // synthesized spec carries the right fields before validation. + if push.IsTabular(a.Spec.Category) { + // Column schema: an explicit --schema wins; otherwise infer + // INT/FLOAT/VARCHAR types from the CSV so the customer doesn't + // hand-write one for the common case. + if a.SchemaFlag != "" { + sch, perr := push.ParseSchema(a.SchemaFlag) + if perr != nil { + return &exitError{code: 2, err: perr} + } + a.Spec.Schema = sch + } else { + sch, skipped, ierr := push.InferSchema(layout.LabelsCSV) + if ierr != nil { + return &exitError{code: 3, err: fmt.Errorf("inferring schema from CSV: %w", ierr)} + } + a.Spec.Schema = sch + _, _ = fmt.Fprintf(out, + "Inferred schema for %d column(s) from %s (override with --schema).\n", + len(sch), filepath.Base(layout.LabelsCSV)) + if len(skipped) > 0 { + _, _ = fmt.Fprintf(out, + " (skipped framework-managed column(s): %s)\n", strings.Join(skipped, ", ")) + } } - a.Spec.TargetSize = []int{w, h} - } else if len(layout.Images) > 0 { - if w, h, derr := push.DetectImageSize(layout.Images[0]); derr == nil { + } else { + // Image target resolution: the ingestor's image_classification + // default is 512x512 and it VALIDATES (it does not resize), so + // a mismatch hard-fails. Honour an explicit --target-size; + // otherwise auto-detect from the first image so the common + // "all my images are NxN" case just works. + if a.TargetSizeFlag != "" { + w, h, perr := push.ParseTargetSize(a.TargetSizeFlag) + if perr != nil { + return &exitError{code: 2, err: perr} + } a.Spec.TargetSize = []int{w, h} - _, _ = fmt.Fprintf(out, - "Auto-detected image target size %dx%d from %s (override with --target-size).\n", - w, h, filepath.Base(layout.Images[0])) - } else { - _, _ = fmt.Fprintf(errOut, - "Note: couldn't auto-detect image size (%v); using the ingestor "+ - "default. Pass --target-size WxH if ingestion reports a "+ - "resolution mismatch.\n", derr) + } else if len(layout.Images) > 0 { + if w, h, derr := push.DetectImageSize(layout.Images[0]); derr == nil { + a.Spec.TargetSize = []int{w, h} + _, _ = fmt.Fprintf(out, + "Auto-detected image target size %dx%d from %s (override with --target-size).\n", + w, h, filepath.Base(layout.Images[0])) + } else { + _, _ = fmt.Fprintf(errOut, + "Note: couldn't auto-detect image size (%v); using the ingestor "+ + "default. Pass --target-size WxH if ingestion reports a "+ + "resolution mismatch.\n", derr) + } } } @@ -538,10 +603,20 @@ func printPushPreflight( // as cli/cluster.go and cli/ingest.go: a pipe-write failure // shouldn't convert success into failure. The exit code is // the contract. + cat, _ := spec["category"].(string) + tabular := push.IsTabular(cat) + _, _ = fmt.Fprintf(out, "Local dataset:\n") _, _ = fmt.Fprintf(out, " root: %s\n", layout.Root) - _, _ = fmt.Fprintf(out, " labels.csv: %s\n", layout.LabelsCSV) - _, _ = fmt.Fprintf(out, " images: %d files\n", len(layout.Images)) + if tabular { + _, _ = fmt.Fprintf(out, " data CSV: %s\n", layout.LabelsCSV) + if sch, ok := spec["schema"].(map[string]string); ok { + _, _ = fmt.Fprintf(out, " columns: %d\n", len(sch)) + } + } else { + _, _ = fmt.Fprintf(out, " labels.csv: %s\n", layout.LabelsCSV) + _, _ = fmt.Fprintf(out, " images: %d files\n", len(layout.Images)) + } _, _ = fmt.Fprintf(out, " total size: %s\n", push.HumanBytes(layout.TotalBytes)) _, _ = fmt.Fprintln(out) @@ -562,7 +637,15 @@ func printPushPreflight( _, _ = fmt.Fprintf(out, " table: %s\n", spec["table"]) _, _ = fmt.Fprintf(out, " category: %s\n", spec["category"]) _, _ = fmt.Fprintf(out, " intent: %s\n", spec["intent"]) - _, _ = fmt.Fprintf(out, " label column: %s\n", spec["label"]) + switch lbl := spec["label"].(type) { + case string: + _, _ = fmt.Fprintf(out, " label column: %s\n", lbl) + case map[string]any: + _, _ = fmt.Fprintf(out, " label column: %v (policy: %v)\n", lbl["column"], lbl["policy"]) + } + if tc, ok := spec["time_column"].(string); ok && tc != "" { + _, _ = fmt.Fprintf(out, " time column: %s\n", tc) + } _, _ = fmt.Fprintf(out, " destination: %s\n", push.FinalDestPrefix(spec["table"].(string))) _, _ = fmt.Fprintln(out) diff --git a/internal/cli/dataset_test.go b/internal/cli/dataset_test.go index 96d7ea0..c31986e 100644 --- a/internal/cli/dataset_test.go +++ b/internal/cli/dataset_test.go @@ -63,19 +63,19 @@ func execDatasetPush(t *testing.T, args []string) (exitCode int, stdout, stderr return ExitCodeFromError(err), so.String(), se.String() } -// TestDatasetPush_UnsupportedCategory_ExitsTwo: v0.1 only supports -// image_classification end-to-end (epic #147 non-goals); the -// CLI-side gate runs before schema validation so a customer who -// passes --category=tabular_classification gets the actionable -// "v0.1 supports only image_classification" message instead of -// the schema's confusing "missing property 'schema'" (which the -// customer has no way to supply in v0.1). Bugbot review-on-self -// caught the missing gate on PR-a; landing it here. +// TestDatasetPush_UnsupportedCategory_ExitsTwo: the CLI-side category +// gate runs before schema validation so a customer who passes a +// not-yet-supported category gets an actionable message (exit 2) +// rather than the schema's confusing missing-property error. Today's +// supported set is image_classification + the tabular / time-series +// family; the other image categories (which need annotation/mask +// sidecar staging), the text family, and nonsense values are gated +// out here. Bugbot review-on-self caught the missing gate on PR-a. func TestDatasetPush_UnsupportedCategory_ExitsTwo(t *testing.T) { root := imgcLayout(t) for _, badCategory := range []string{ - "tabular_classification", // schema-valid but v0.1-unsupported - "object_detection", // ditto + "object_detection", // image category, needs sidecar staging (later) + "text_classification", // text family (later) "definitely-not-a-category", // nonsense; gate catches this too } { t.Run(badCategory, func(t *testing.T) { diff --git a/internal/push/category.go b/internal/push/category.go new file mode 100644 index 0000000..6cf21c7 --- /dev/null +++ b/internal/push/category.go @@ -0,0 +1,51 @@ +package push + +// Category families. These mirror data-ingestors' +// tracebloc_ingestor/cli/conventions.py groupings so the CLI's +// per-category behaviour (which flags are required, which local +// layout to expect, which spec fields to emit) stays in lock-step +// with what the ingestor actually resolves. +// +// Kept as a single source of truth here rather than scattered +// string comparisons across spec.go / dataset.go. + +// imageCategories take a labels CSV + an images/ directory (plus, +// for some, extra sidecar dirs handled in later increments). +var imageCategories = map[string]bool{ + "image_classification": true, + "object_detection": true, + "keypoint_detection": true, + "semantic_segmentation": true, + "instance_segmentation": true, +} + +// tabularCategories take a single CSV whose columns are described by +// a `schema` (column → SQL type) map. No sidecar files. +var tabularCategories = map[string]bool{ + "tabular_classification": true, + "tabular_regression": true, + "time_series_forecasting": true, + "time_to_event_prediction": true, +} + +// regressionClassCategories predict a numeric target rather than a +// class. The schema requires the label in object form with an +// explicit `policy` so the raw target never ships to the central +// backend by default (policy=bucket bins it first). +var regressionClassCategories = map[string]bool{ + "tabular_regression": true, + "time_series_forecasting": true, + "time_to_event_prediction": true, +} + +// IsImage reports whether category uses the labels.csv + images/ +// local layout. +func IsImage(category string) bool { return imageCategories[category] } + +// IsTabular reports whether category uses the single-CSV + schema +// local layout (no sidecar files). +func IsTabular(category string) bool { return tabularCategories[category] } + +// IsRegressionClass reports whether category predicts a numeric +// target and therefore needs label.policy (object label form). +func IsRegressionClass(category string) bool { return regressionClassCategories[category] } diff --git a/internal/push/spec.go b/internal/push/spec.go index ebb7d98..6767440 100644 --- a/internal/push/spec.go +++ b/internal/push/spec.go @@ -141,8 +141,27 @@ type SpecArgs struct { // spec.file_options.target_size so the customer's actual // resolution wins. Empty (len 0) ⇒ omit and let the ingestor // default apply. Populated by the CLI from --target-size or by - // auto-detecting the first image. + // auto-detecting the first image. (Image categories only.) TargetSize []int + + // Schema is the column→SQL-type map for tabular / time-series + // categories (required by the schema for those). Populated by the + // CLI from --schema or by inferring types from the CSV. Ignored + // for image categories. + Schema map[string]string + + // LabelPolicy is "passthrough" or "bucket". Regression-class + // categories (tabular_regression, time_series_forecasting, + // time_to_event_prediction) require the object label form with a + // policy; the CLI defaults it to "bucket" so the raw numeric + // target never ships to the central backend. Ignored for + // classification categories (which emit the string label form). + LabelPolicy string + + // TimeColumn names the time column for time_to_event_prediction. + // Emitted as the top-level `time_column` field. Empty ⇒ the + // ingestor falls back to a column named "time". + TimeColumn string } // Build produces the ingest.v1.json-conforming spec map. The @@ -173,19 +192,31 @@ func (a SpecArgs) Build() map[string]any { "category": a.Category, "table": a.Table, "intent": a.Intent, - // Trailing slash on `images` matches the schema example - // (data-ingestors/examples/yaml/image_classification.yaml, - // line 14). The ingestor treats it as a directory glob. - "csv": path.Join(prefix, "labels.csv"), - "images": path.Join(prefix, "images") + "/", - "label": a.LabelColumn, + "csv": path.Join(prefix, "labels.csv"), + } + if IsTabular(a.Category) { + a.buildTabular(spec) + } else { + // Image categories (and any not-yet-special-cased category — + // the schema validator produces the canonical error for those). + a.buildImage(spec, prefix) } + return spec +} + +// buildImage fills in the image-category fields: the images/ sidecar +// dir, the label column, and the optional target_size override. +func (a SpecArgs) buildImage(spec map[string]any, prefix string) { + // Trailing slash on `images` matches the schema example + // (data-ingestors/examples/yaml/image_classification.yaml); the + // ingestor treats it as a directory glob. + spec["images"] = path.Join(prefix, "images") + "/" + spec["label"] = a.LabelColumn // Emit the image resolution under spec.file_options.target_size — - // the same override key the helm flow + data-ingestors' - // conventions.resolve honour (it merges spec.file_options over the - // per-category default). Without this, image_classification - // defaults to 512x512 and the ingestor's Image Resolution - // Validator rejects any other size. + // the same override key data-ingestors' conventions.resolve + // honours. Without it, image_classification defaults to 512x512 + // and the ingestor's Image Resolution Validator rejects any other + // size. if len(a.TargetSize) == 2 { spec["spec"] = map[string]any{ "file_options": map[string]any{ @@ -193,7 +224,31 @@ func (a SpecArgs) Build() map[string]any { }, } } - return spec +} + +// buildTabular fills in the tabular / time-series fields: the column +// schema, the label (string form for classification, object form with +// a policy for regression-class), and time_column for +// time_to_event_prediction. +func (a SpecArgs) buildTabular(spec map[string]any) { + spec["schema"] = a.Schema + if IsRegressionClass(a.Category) { + // Regression-class tasks require the object label form with an + // explicit policy (the schema enforces this). Default to + // `bucket` so the raw numeric target never ships to the central + // backend unless the customer opts into passthrough. + policy := a.LabelPolicy + if policy == "" { + policy = "bucket" + } + spec["label"] = map[string]any{"column": a.LabelColumn, "policy": policy} + } else { + // tabular_classification: plain column-name label. + spec["label"] = a.LabelColumn + } + if a.Category == "time_to_event_prediction" && a.TimeColumn != "" { + spec["time_column"] = a.TimeColumn + } } // SharedRoot is the in-cluster mount path of the chart's shared PVC diff --git a/internal/push/spec_test.go b/internal/push/spec_test.go index a6301cb..79f3b80 100644 --- a/internal/push/spec_test.go +++ b/internal/push/spec_test.go @@ -197,6 +197,84 @@ func TestBuild_NoTargetSize_OmitsSpecBlock(t *testing.T) { } } +// TestBuild_Tabular_PassesSchema pins the tabular Build branch: it +// emits schema-valid specs for the three label shapes — a string +// label (tabular_classification), an object label+policy +// (tabular_regression), and an object label + time_column +// (time_to_event_prediction) — and never emits an images field. +func TestBuild_Tabular_PassesSchema(t *testing.T) { + v, err := schema.NewV1Validator() + if err != nil { + t.Fatalf("NewV1Validator: %v", err) + } + check := func(name string, a SpecArgs, wantLabelObject bool) { + t.Run(name, func(t *testing.T) { + spec := a.Build() + if _, hasImages := spec["images"]; hasImages { + t.Errorf("tabular spec emitted an images field: %v", spec) + } + if _, hasSchema := spec["schema"]; !hasSchema { + t.Errorf("tabular spec missing schema: %v", spec) + } + if wantLabelObject { + if _, ok := spec["label"].(map[string]any); !ok { + t.Errorf("label = %#v, want object form {column, policy}", spec["label"]) + } + } else { + if _, ok := spec["label"].(string); !ok { + t.Errorf("label = %#v, want string form", spec["label"]) + } + } + b, err := yaml.Marshal(spec) + if err != nil { + t.Fatalf("marshal: %v", err) + } + _, errs, parseErr := v.ValidateYAML(b) + if parseErr != nil { + t.Fatalf("parse: %v\n%s", parseErr, b) + } + if len(errs) != 0 { + t.Fatalf("schema validation failed: %s\n%s", schema.FormatErrors(errs), b) + } + }) + } + + check("tabular_classification", SpecArgs{ + Table: "t_clf", Category: "tabular_classification", Intent: "train", + LabelColumn: "label", + Schema: map[string]string{"f0": "FLOAT", "f1": "FLOAT", "label": "INT"}, + }, false) + + check("tabular_regression", SpecArgs{ + Table: "t_reg", Category: "tabular_regression", Intent: "train", + LabelColumn: "price", + Schema: map[string]string{"sqft": "FLOAT", "price": "FLOAT"}, + }, true) + + check("time_to_event_prediction", SpecArgs{ + Table: "t_tte", Category: "time_to_event_prediction", Intent: "train", + LabelColumn: "DEATH_EVENT", TimeColumn: "time", + Schema: map[string]string{"age": "INT", "time": "INT", "DEATH_EVENT": "INT"}, + }, true) +} + +// TestBuild_Tabular_RegressionDefaultsPolicyBucket: regression-class +// categories default to policy=bucket so the raw numeric target never +// ships to the central backend unless the customer opts out. +func TestBuild_Tabular_RegressionDefaultsPolicyBucket(t *testing.T) { + spec := SpecArgs{ + Table: "t", Category: "tabular_regression", Intent: "train", + LabelColumn: "y", Schema: map[string]string{"x": "FLOAT", "y": "FLOAT"}, + }.Build() + lbl, ok := spec["label"].(map[string]any) + if !ok { + t.Fatalf("label = %#v, want object", spec["label"]) + } + if lbl["policy"] != "bucket" { + t.Errorf("default policy = %v, want bucket", lbl["policy"]) + } +} + // TestValidateTableName_Accepts pins the names that MUST pass — // the real-world example tables plus a few edge shapes (single // char, leading underscore, mixed case, digits). A regression diff --git a/internal/push/tabular.go b/internal/push/tabular.go new file mode 100644 index 0000000..8b10e74 --- /dev/null +++ b/internal/push/tabular.go @@ -0,0 +1,229 @@ +package push + +import ( + "encoding/csv" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strconv" + "strings" +) + +// reservedColumns are framework-managed columns the ingestor adds +// itself; a user schema must not redeclare them — data-ingestors' +// database.create_table rejects collisions with a clear error. Schema +// auto-inference skips them so a CSV that happens to carry an `id` +// (or data_id, filename, …) column doesn't produce a schema the +// ingestor refuses. `label` is intentionally NOT reserved — it's the +// mapped label column. Mirrors database.py's _RESERVED set. +var reservedColumns = map[string]bool{ + "id": true, + "created_at": true, + "updated_at": true, + "status": true, + "data_intent": true, + "data_id": true, + "filename": true, + "extension": true, + "annotation": true, + "ingestor_id": true, +} + +// schemaInferenceSampleRows caps how many data rows InferSchema reads +// to decide each column's type. The whole CSV would be more accurate +// but a few thousand rows is plenty to distinguish INT/FLOAT/text in +// practice, and bounds the work for large files. A column whose true +// type only reveals itself past the sample (e.g. an int column that +// turns float on row 10k) is the case --schema exists to override. +const schemaInferenceSampleRows = 5000 + +// DiscoverTabular validates a local directory for a tabular / +// time-series ingestion. Unlike the image layout, tabular categories +// have NO sidecar files — the dataset IS a single CSV. The directory +// must contain exactly one .csv file; that becomes the labels/data +// CSV staged for the ingestor. +// +// The returned LocalLayout reuses the image layout's LabelsCSV field +// (staged as labels.csv) with an empty Images slice, so the existing +// tar/stream machinery handles it unchanged. +func DiscoverTabular(rootDir string) (*LocalLayout, error) { + abs, err := filepath.Abs(rootDir) + if err != nil { + return nil, fmt.Errorf("resolving %q: %w", rootDir, err) + } + st, err := os.Stat(abs) + if err != nil { + return nil, fmt.Errorf("reading dataset directory %q: %w", abs, err) + } + if !st.IsDir() { + return nil, fmt.Errorf( + "%q is not a directory; pass the directory containing the dataset CSV", abs) + } + + entries, err := os.ReadDir(abs) + if err != nil { + return nil, fmt.Errorf("reading %q: %w", abs, err) + } + var csvs []string + for _, e := range entries { + if e.IsDir() { + continue + } + if strings.EqualFold(filepath.Ext(e.Name()), ".csv") { + csvs = append(csvs, e.Name()) + } + } + sort.Strings(csvs) + switch len(csvs) { + case 0: + return nil, fmt.Errorf( + "no .csv file found in %q. Tabular / time-series categories expect a "+ + "single CSV holding the dataset (one column per feature, plus the "+ + "label column).", abs) + case 1: + // happy path + default: + return nil, fmt.Errorf( + "found %d .csv files in %q (%s); the tabular layout expects exactly one. "+ + "Put the dataset CSV in its own directory and re-run.", + len(csvs), abs, strings.Join(csvs, ", ")) + } + + csvPath := filepath.Join(abs, csvs[0]) + // Lstat (not Stat) so a symlinked CSV is rejected rather than + // silently followed — mirrors the image layout's symlink guard. + info, err := os.Lstat(csvPath) + if err != nil { + return nil, fmt.Errorf("stat %s: %w", csvs[0], err) + } + if err := rejectSymlink(info, csvs[0]); err != nil { + return nil, err + } + if info.Size() > MaxSingleFileBytes { + return nil, sizeError(csvs[0], info.Size(), MaxSingleFileBytes) + } + + layout := &LocalLayout{Root: abs, LabelsCSV: csvPath, TotalBytes: info.Size()} + if layout.TotalBytes > MaxTotalBytes { + return nil, fmt.Errorf( + "dataset is %s, exceeds v0.1 cap of %s. For larger datasets, the "+ + "cloud-source path is on the v0.2 roadmap (tracebloc/client#147).", + HumanBytes(layout.TotalBytes), HumanBytes(MaxTotalBytes)) + } + return layout, nil +} + +// ParseSchema parses a --schema flag value of the form +// "col:TYPE,col:TYPE,..." into a column→type map. Types are passed +// through verbatim (the ingestor validates them against the SQL types +// it supports: INT, BIGINT, FLOAT, BOOLEAN, DATE, DATETIME, +// TIMESTAMP, TIME, TEXT, VARCHAR(n), ...). Whitespace around tokens +// is trimmed. +func ParseSchema(s string) (map[string]string, error) { + out := map[string]string{} + for _, pair := range strings.Split(s, ",") { + pair = strings.TrimSpace(pair) + if pair == "" { + continue + } + col, typ, ok := strings.Cut(pair, ":") + col, typ = strings.TrimSpace(col), strings.TrimSpace(typ) + if !ok || col == "" || typ == "" { + return nil, fmt.Errorf( + "schema entry %q must be col:TYPE (e.g. age:INT,price:FLOAT)", pair) + } + out[col] = typ + } + if len(out) == 0 { + return nil, fmt.Errorf("--schema is empty; expected col:TYPE,col:TYPE,...") + } + return out, nil +} + +// InferSchema reads the CSV header and a sample of rows and infers a +// column→SQL-type map: all-integer columns → INT, otherwise +// all-numeric → FLOAT, otherwise VARCHAR(255). Empty cells are +// ignored when judging a column; a column with no non-empty sampled +// value falls back to VARCHAR(255). +// +// It's a convenience so customers don't hand-write a --schema for the +// common case. Non-numeric specials (timestamps, dates, booleans) +// infer as VARCHAR(255); pass --schema to declare them precisely. +// +// Framework-managed columns (see reservedColumns — id, data_id, …) +// are skipped and returned as the second value so the caller can tell +// the customer they weren't included. +func InferSchema(csvPath string) (schema map[string]string, skipped []string, err error) { + f, err := os.Open(csvPath) + if err != nil { + return nil, nil, err + } + defer func() { _ = f.Close() }() + + r := csv.NewReader(f) + header, err := r.Read() + if err != nil { + return nil, nil, fmt.Errorf("reading CSV header from %s: %w", csvPath, err) + } + if len(header) == 0 { + return nil, nil, fmt.Errorf("CSV %s has no columns", csvPath) + } + + // Per-column running judgement. + couldBeInt := make([]bool, len(header)) + couldBeFloat := make([]bool, len(header)) + sawValue := make([]bool, len(header)) + for i := range header { + couldBeInt[i] = true + couldBeFloat[i] = true + } + + for n := 0; n < schemaInferenceSampleRows; n++ { + row, err := r.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, nil, fmt.Errorf("reading CSV row from %s: %w", csvPath, err) + } + for i := 0; i < len(header) && i < len(row); i++ { + v := strings.TrimSpace(row[i]) + if v == "" { + continue + } + sawValue[i] = true + if couldBeInt[i] { + if _, e := strconv.ParseInt(v, 10, 64); e != nil { + couldBeInt[i] = false + } + } + if couldBeFloat[i] { + if _, e := strconv.ParseFloat(v, 64); e != nil { + couldBeFloat[i] = false + } + } + } + } + + schema = make(map[string]string, len(header)) + for i, col := range header { + col = strings.TrimSpace(col) + if reservedColumns[col] { + // Framework-managed (id, data_id, …): the ingestor adds it + // and rejects a schema that redeclares it. Skip + report. + skipped = append(skipped, col) + continue + } + switch { + case sawValue[i] && couldBeInt[i]: + schema[col] = "INT" + case sawValue[i] && couldBeFloat[i]: + schema[col] = "FLOAT" + default: + schema[col] = "VARCHAR(255)" + } + } + return schema, skipped, nil +} diff --git a/internal/push/tabular_test.go b/internal/push/tabular_test.go new file mode 100644 index 0000000..322101c --- /dev/null +++ b/internal/push/tabular_test.go @@ -0,0 +1,153 @@ +package push + +import ( + "os" + "path/filepath" + "testing" +) + +func writeFile(t *testing.T, dir, name, body string) string { + t.Helper() + p := filepath.Join(dir, name) + if err := os.WriteFile(p, []byte(body), 0o644); err != nil { + t.Fatal(err) + } + return p +} + +// TestDiscoverTabular_SingleCSV: a directory with exactly one CSV +// resolves to a layout whose LabelsCSV is that file and whose Images +// slice is empty (so the existing tar/stream machinery stages just +// the CSV). +func TestDiscoverTabular_SingleCSV(t *testing.T) { + dir := t.TempDir() + csv := writeFile(t, dir, "data.csv", "a,b\n1,2\n") + + layout, err := DiscoverTabular(dir) + if err != nil { + t.Fatalf("DiscoverTabular: %v", err) + } + if layout.LabelsCSV != csv { + t.Errorf("LabelsCSV = %q, want %q", layout.LabelsCSV, csv) + } + if len(layout.Images) != 0 { + t.Errorf("Images = %v, want empty (tabular has no sidecar files)", layout.Images) + } + if layout.TotalBytes == 0 { + t.Errorf("TotalBytes = 0, want the CSV's size") + } +} + +// TestDiscoverTabular_NoCSV and _MultipleCSV: the layout requires +// exactly one CSV; zero or many is a clear, actionable error rather +// than a guess. +func TestDiscoverTabular_NoCSV(t *testing.T) { + dir := t.TempDir() + writeFile(t, dir, "notes.txt", "hello") + if _, err := DiscoverTabular(dir); err == nil { + t.Error("DiscoverTabular with no .csv returned nil error") + } +} + +func TestDiscoverTabular_MultipleCSV(t *testing.T) { + dir := t.TempDir() + writeFile(t, dir, "a.csv", "x\n1\n") + writeFile(t, dir, "b.csv", "y\n2\n") + if _, err := DiscoverTabular(dir); err == nil { + t.Error("DiscoverTabular with two .csv files returned nil error") + } +} + +// TestInferSchema covers the INT / FLOAT / VARCHAR inference from a +// CSV header + sample rows. Integer-only columns → INT, numeric (with +// a decimal) → FLOAT, anything else → VARCHAR(255). +func TestInferSchema(t *testing.T) { + dir := t.TempDir() + csv := writeFile(t, dir, "data.csv", + "count,age,price,name\n1,30,9.99,alice\n2,40,19.5,bob\n") + + schema, _, err := InferSchema(csv) + if err != nil { + t.Fatalf("InferSchema: %v", err) + } + want := map[string]string{ + "count": "INT", + "age": "INT", + "price": "FLOAT", + "name": "VARCHAR(255)", + } + for col, typ := range want { + if schema[col] != typ { + t.Errorf("schema[%q] = %q, want %q (full: %v)", col, schema[col], typ, schema) + } + } +} + +// TestInferSchema_EmptyColumnIsVarchar: a column with no non-empty +// sampled value can't be typed, so it falls back to VARCHAR(255) +// rather than being mislabeled INT/FLOAT. +func TestInferSchema_EmptyColumnIsVarchar(t *testing.T) { + dir := t.TempDir() + csv := writeFile(t, dir, "data.csv", "filled,empty\n1,\n2,\n") + schema, _, err := InferSchema(csv) + if err != nil { + t.Fatalf("InferSchema: %v", err) + } + if schema["empty"] != "VARCHAR(255)" { + t.Errorf("schema[empty] = %q, want VARCHAR(255)", schema["empty"]) + } + if schema["filled"] != "INT" { + t.Errorf("schema[filled] = %q, want INT", schema["filled"]) + } +} + +// TestInferSchema_SkipsReservedColumns: a CSV with an `id` (or other +// framework-managed) column must NOT produce a schema that includes +// it — data-ingestors' create_table rejects such collisions (the +// #135b guard). The reserved columns come back in the skipped list. +func TestInferSchema_SkipsReservedColumns(t *testing.T) { + dir := t.TempDir() + csv := writeFile(t, dir, "data.csv", "id,feature_00,label\n1,1.5,0\n2,2.5,1\n") + + schema, skipped, err := InferSchema(csv) + if err != nil { + t.Fatalf("InferSchema: %v", err) + } + if _, present := schema["id"]; present { + t.Errorf("schema includes reserved column id: %v", schema) + } + if schema["feature_00"] != "FLOAT" || schema["label"] != "INT" { + t.Errorf("schema = %v, want feature_00:FLOAT, label:INT", schema) + } + foundID := false + for _, s := range skipped { + if s == "id" { + foundID = true + } + } + if !foundID { + t.Errorf("skipped = %v, want it to contain id", skipped) + } +} + +func TestParseSchema(t *testing.T) { + got, err := ParseSchema("age:INT, price:FLOAT ,name:VARCHAR(255)") + if err != nil { + t.Fatalf("ParseSchema: %v", err) + } + want := map[string]string{"age": "INT", "price": "FLOAT", "name": "VARCHAR(255)"} + if len(got) != len(want) { + t.Fatalf("ParseSchema len = %d, want %d (%v)", len(got), len(want), got) + } + for k, v := range want { + if got[k] != v { + t.Errorf("ParseSchema[%q] = %q, want %q", k, got[k], v) + } + } + + for _, bad := range []string{"", "age", "age:", ":INT", "age=INT"} { + if _, err := ParseSchema(bad); err == nil { + t.Errorf("ParseSchema(%q) = nil error, want rejection", bad) + } + } +}