Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 139 additions & 56 deletions internal/cli/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"path/filepath"
"strings"

"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -68,14 +69,17 @@ func newDatasetPushCmd() *cobra.Command {
contextOverride string
nsOverride string

// Ingest-spec flags. The set is intentionally
// image_classification-only for v0.1 per epic #147
// non-goals; other categories are one-PR additions in v0.2.
// Ingest-spec flags. image_classification + the tabular /
// time-series family are supported today; text + detection +
// segmentation land in later increments.
table string
category string
intent string
labelColumn string
targetSize string
schemaFlag string
labelPolicy string
timeColumn string

// Operations flags.
dryRun bool
Expand Down Expand Up @@ -142,12 +146,16 @@ Exit codes:
RunE: func(cmd *cobra.Command, args []string) error {
return runDatasetPush(cmd.Context(), cmd.OutOrStdout(), cmd.ErrOrStderr(),
runDatasetPushArgs{
LocalPath: args[0],
Kubeconfig: kubeconfigPath,
Context: contextOverride,
Namespace: nsOverride,
Spec: push.SpecArgs{Table: table, Category: category, Intent: intent, LabelColumn: labelColumn},
LocalPath: args[0],
Kubeconfig: kubeconfigPath,
Context: contextOverride,
Namespace: nsOverride,
Spec: push.SpecArgs{
Table: table, Category: category, Intent: intent,
LabelColumn: labelColumn, LabelPolicy: labelPolicy, TimeColumn: timeColumn,
},
TargetSizeFlag: targetSize,
SchemaFlag: schemaFlag,
DryRun: dryRun,
IngestorSAName: ingestorSAName,
StagePodImage: stagePodImage,
Expand All @@ -173,14 +181,23 @@ Exit codes:
cmd.Flags().StringVar(&table, "table", "",
"destination table name (MySQL identifier; matches /data/shared/<table>/ on the PVC)")
cmd.Flags().StringVar(&category, "category", "image_classification",
"task category (v0.1 only supports image_classification; see tracebloc/client#147 non-goals)")
"task category: image_classification, tabular_classification, tabular_regression, "+
"time_series_forecasting, time_to_event_prediction")
cmd.Flags().StringVar(&intent, "intent", "",
"intent: train|test")
cmd.Flags().StringVar(&labelColumn, "label-column", "",
"column name in labels.csv that holds the label")
"name of the label/target column (in labels.csv for image categories, in the data CSV for tabular)")
cmd.Flags().StringVar(&targetSize, "target-size", "",
"image resolution as WxH (e.g. 512x512). Default: auto-detected from the first image. "+
"image categories only: resolution as WxH (e.g. 512x512). Default: auto-detected from the first image. "+
"All images must share this resolution β€” the ingestor validates it, it does not resize.")
cmd.Flags().StringVar(&schemaFlag, "schema", "",
"tabular/time-series only: column types as col:TYPE,col:TYPE (e.g. age:INT,price:FLOAT). "+
"Default: inferred from the CSV (INT/FLOAT/VARCHAR).")
cmd.Flags().StringVar(&labelPolicy, "label-policy", "",
"regression-class only (tabular_regression, time_series_forecasting, time_to_event_prediction): "+
"passthrough|bucket (default bucket β€” bins the target so the raw value never leaves the cluster)")
cmd.Flags().StringVar(&timeColumn, "time-column", "",
"time_to_event_prediction only: name of the time/duration column (default: a column named \"time\")")

cmd.Flags().BoolVar(&dryRun, "dry-run", false,
"validate + discover + walk, but don't create any cluster resources")
Expand Down Expand Up @@ -215,7 +232,8 @@ type runDatasetPushArgs struct {
Context string
Namespace string
Spec push.SpecArgs
TargetSizeFlag string // raw --target-size; resolved after Discover
TargetSizeFlag string // raw --target-size; resolved after Discover (image)
SchemaFlag string // raw --schema; resolved or inferred after Discover (tabular)
DryRun bool
IngestorSAName string
StagePodImage string
Expand Down Expand Up @@ -249,56 +267,103 @@ func runDatasetPush(ctx context.Context, out, errOut io.Writer, a runDatasetPush
return &exitError{code: 2, err: err}
}

// 2. v0.1 category gate. Runs BEFORE schema validation because
// schema-valid-but-unsupported categories (e.g.
// tabular_classification) would otherwise fail with the
// schema's "missing property 'schema'" message β€” confusing
// for the customer who has no way to set --schema in v0.1.
// Nonsense categories (typos) also hit this gate; the
// "only image_classification in v0.1" message is more
// actionable than the schema's 11-option enum list anyway.
// Bugbot's review-on-self caught the missing gate on PR-a.
if a.Spec.Category != "" && a.Spec.Category != "image_classification" {
// 2. Category gate. Runs BEFORE schema validation so an
// unsupported category gets a clear, actionable CLI message
// rather than the schema's terse enum / missing-property error.
// Supported today: image_classification + the tabular /
// time-series family. The other image categories need sidecar
// (annotation/mask) staging the CLI doesn't do yet, and the
// text family needs a texts/sequences dir β€” both land in later
// increments. A typo'd category also lands here with a clear
// list rather than the schema's 11-option enum dump.
switch {
case a.Spec.Category == "":
// Left empty by a caller; let the schema produce the canonical
// "category is required" error downstream.
case push.IsTabular(a.Spec.Category) || a.Spec.Category == "image_classification":
// supported
case push.IsImage(a.Spec.Category):
return &exitError{code: 2, err: fmt.Errorf(
"category %q is not supported in v0.1 (only image_classification). "+
"Other categories are one-PR additions in v0.2 β€” see "+
"tracebloc/client#147 non-goals.", a.Spec.Category)}
"category %q isn't supported by the CLI yet β€” it needs annotation/mask "+
"sidecar staging that's coming in a later release. Supported image "+
"category: image_classification.", a.Spec.Category)}
default:
return &exitError{code: 2, err: fmt.Errorf(
"category %q isn't supported by the CLI yet. Supported: image_classification, "+
"tabular_classification, tabular_regression, time_series_forecasting, "+
"time_to_event_prediction. (Text / detection / segmentation are coming; "+
"use the helm flow for those meanwhile.)", a.Spec.Category)}
}

// 3. Walk the local directory FIRST. Enforces layout + size caps,
// and gives us the image list the target-size auto-detect below
// needs. Both this and the schema check are local "fail fast"
// steps; doing the walk first lets the synthesized spec carry
// the resolved target_size.
layout, err := push.Discover(a.LocalPath)
// 3. Walk the local directory FIRST (local "fail fast"), dispatched
// by category family. Image categories expect labels.csv +
// images/; tabular / time-series categories expect a single
// data CSV. The walk also yields what the per-category
// resolution below needs (the image list for target-size, the
// CSV for schema inference).
var (
layout *push.LocalLayout
err error
)
if push.IsTabular(a.Spec.Category) {
layout, err = push.DiscoverTabular(a.LocalPath)
} else {
layout, err = push.Discover(a.LocalPath)
}
if err != nil {
return &exitError{code: 3, err: err}
}

// 3a. Resolve the image target resolution. The ingestor's
// image_classification default is 512x512 and it VALIDATES
// (it does not resize), so a mismatch hard-fails the run with
// an "incorrect resolution" error. Honour an explicit
// --target-size; otherwise auto-detect from the first image so
// the common "all my images are NxN" case just works without
// the customer needing to know the knob exists.
if a.TargetSizeFlag != "" {
w, h, perr := push.ParseTargetSize(a.TargetSizeFlag)
if perr != nil {
return &exitError{code: 2, err: perr}
// 3a. Per-category spec resolution from the local data, so the
// synthesized spec carries the right fields before validation.
if push.IsTabular(a.Spec.Category) {
// Column schema: an explicit --schema wins; otherwise infer
// INT/FLOAT/VARCHAR types from the CSV so the customer doesn't
// hand-write one for the common case.
if a.SchemaFlag != "" {
sch, perr := push.ParseSchema(a.SchemaFlag)
if perr != nil {
return &exitError{code: 2, err: perr}
}
a.Spec.Schema = sch
} else {
sch, skipped, ierr := push.InferSchema(layout.LabelsCSV)
if ierr != nil {
return &exitError{code: 3, err: fmt.Errorf("inferring schema from CSV: %w", ierr)}
}
a.Spec.Schema = sch
_, _ = fmt.Fprintf(out,
"Inferred schema for %d column(s) from %s (override with --schema).\n",
len(sch), filepath.Base(layout.LabelsCSV))
if len(skipped) > 0 {
_, _ = fmt.Fprintf(out,
" (skipped framework-managed column(s): %s)\n", strings.Join(skipped, ", "))
}
}
a.Spec.TargetSize = []int{w, h}
} else if len(layout.Images) > 0 {
if w, h, derr := push.DetectImageSize(layout.Images[0]); derr == nil {
} else {
// Image target resolution: the ingestor's image_classification
// default is 512x512 and it VALIDATES (it does not resize), so
// a mismatch hard-fails. Honour an explicit --target-size;
// otherwise auto-detect from the first image so the common
// "all my images are NxN" case just works.
if a.TargetSizeFlag != "" {
w, h, perr := push.ParseTargetSize(a.TargetSizeFlag)
if perr != nil {
return &exitError{code: 2, err: perr}
}
a.Spec.TargetSize = []int{w, h}
_, _ = fmt.Fprintf(out,
"Auto-detected image target size %dx%d from %s (override with --target-size).\n",
w, h, filepath.Base(layout.Images[0]))
} else {
_, _ = fmt.Fprintf(errOut,
"Note: couldn't auto-detect image size (%v); using the ingestor "+
"default. Pass --target-size WxH if ingestion reports a "+
"resolution mismatch.\n", derr)
} else if len(layout.Images) > 0 {
if w, h, derr := push.DetectImageSize(layout.Images[0]); derr == nil {
a.Spec.TargetSize = []int{w, h}
_, _ = fmt.Fprintf(out,
"Auto-detected image target size %dx%d from %s (override with --target-size).\n",
w, h, filepath.Base(layout.Images[0]))
} else {
_, _ = fmt.Fprintf(errOut,
"Note: couldn't auto-detect image size (%v); using the ingestor "+
"default. Pass --target-size WxH if ingestion reports a "+
"resolution mismatch.\n", derr)
}
}
}

Expand Down Expand Up @@ -538,10 +603,20 @@ func printPushPreflight(
// as cli/cluster.go and cli/ingest.go: a pipe-write failure
// shouldn't convert success into failure. The exit code is
// the contract.
cat, _ := spec["category"].(string)
tabular := push.IsTabular(cat)

_, _ = fmt.Fprintf(out, "Local dataset:\n")
_, _ = fmt.Fprintf(out, " root: %s\n", layout.Root)
_, _ = fmt.Fprintf(out, " labels.csv: %s\n", layout.LabelsCSV)
_, _ = fmt.Fprintf(out, " images: %d files\n", len(layout.Images))
if tabular {
_, _ = fmt.Fprintf(out, " data CSV: %s\n", layout.LabelsCSV)
if sch, ok := spec["schema"].(map[string]string); ok {
_, _ = fmt.Fprintf(out, " columns: %d\n", len(sch))
}
} else {
_, _ = fmt.Fprintf(out, " labels.csv: %s\n", layout.LabelsCSV)
_, _ = fmt.Fprintf(out, " images: %d files\n", len(layout.Images))
}
_, _ = fmt.Fprintf(out, " total size: %s\n", push.HumanBytes(layout.TotalBytes))
_, _ = fmt.Fprintln(out)

Expand All @@ -562,7 +637,15 @@ func printPushPreflight(
_, _ = fmt.Fprintf(out, " table: %s\n", spec["table"])
_, _ = fmt.Fprintf(out, " category: %s\n", spec["category"])
_, _ = fmt.Fprintf(out, " intent: %s\n", spec["intent"])
_, _ = fmt.Fprintf(out, " label column: %s\n", spec["label"])
switch lbl := spec["label"].(type) {
case string:
_, _ = fmt.Fprintf(out, " label column: %s\n", lbl)
case map[string]any:
_, _ = fmt.Fprintf(out, " label column: %v (policy: %v)\n", lbl["column"], lbl["policy"])
}
if tc, ok := spec["time_column"].(string); ok && tc != "" {
_, _ = fmt.Fprintf(out, " time column: %s\n", tc)
}
_, _ = fmt.Fprintf(out, " destination: %s\n", push.FinalDestPrefix(spec["table"].(string)))
_, _ = fmt.Fprintln(out)

Expand Down
20 changes: 10 additions & 10 deletions internal/cli/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ func execDatasetPush(t *testing.T, args []string) (exitCode int, stdout, stderr
return ExitCodeFromError(err), so.String(), se.String()
}

// TestDatasetPush_UnsupportedCategory_ExitsTwo: v0.1 only supports
// image_classification end-to-end (epic #147 non-goals); the
// CLI-side gate runs before schema validation so a customer who
// passes --category=tabular_classification gets the actionable
// "v0.1 supports only image_classification" message instead of
// the schema's confusing "missing property 'schema'" (which the
// customer has no way to supply in v0.1). Bugbot review-on-self
// caught the missing gate on PR-a; landing it here.
// TestDatasetPush_UnsupportedCategory_ExitsTwo: the CLI-side category
// gate runs before schema validation so a customer who passes a
// not-yet-supported category gets an actionable message (exit 2)
// rather than the schema's confusing missing-property error. Today's
// supported set is image_classification + the tabular / time-series
// family; the other image categories (which need annotation/mask
// sidecar staging), the text family, and nonsense values are gated
// out here. Bugbot review-on-self caught the missing gate on PR-a.
func TestDatasetPush_UnsupportedCategory_ExitsTwo(t *testing.T) {
root := imgcLayout(t)
for _, badCategory := range []string{
"tabular_classification", // schema-valid but v0.1-unsupported
"object_detection", // ditto
"object_detection", // image category, needs sidecar staging (later)
"text_classification", // text family (later)
"definitely-not-a-category", // nonsense; gate catches this too
} {
t.Run(badCategory, func(t *testing.T) {
Expand Down
51 changes: 51 additions & 0 deletions internal/push/category.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package push

// Category families. These mirror data-ingestors'
// tracebloc_ingestor/cli/conventions.py groupings so the CLI's
// per-category behaviour (which flags are required, which local
// layout to expect, which spec fields to emit) stays in lock-step
// with what the ingestor actually resolves.
//
// Kept as a single source of truth here rather than scattered
// string comparisons across spec.go / dataset.go.

// imageCategories take a labels CSV + an images/ directory (plus,
// for some, extra sidecar dirs handled in later increments).
var imageCategories = map[string]bool{
"image_classification": true,
"object_detection": true,
"keypoint_detection": true,
"semantic_segmentation": true,
"instance_segmentation": true,
}

// tabularCategories take a single CSV whose columns are described by
// a `schema` (column β†’ SQL type) map. No sidecar files.
var tabularCategories = map[string]bool{
"tabular_classification": true,
"tabular_regression": true,
"time_series_forecasting": true,
"time_to_event_prediction": true,
}

// regressionClassCategories predict a numeric target rather than a
// class. The schema requires the label in object form with an
// explicit `policy` so the raw target never ships to the central
// backend by default (policy=bucket bins it first).
var regressionClassCategories = map[string]bool{
"tabular_regression": true,
"time_series_forecasting": true,
"time_to_event_prediction": true,
}

// IsImage reports whether category uses the labels.csv + images/
// local layout.
func IsImage(category string) bool { return imageCategories[category] }

// IsTabular reports whether category uses the single-CSV + schema
// local layout (no sidecar files).
func IsTabular(category string) bool { return tabularCategories[category] }

// IsRegressionClass reports whether category predicts a numeric
// target and therefore needs label.policy (object label form).
func IsRegressionClass(category string) bool { return regressionClassCategories[category] }
Loading
Loading