Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/cli/coverage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,24 @@ func TestExitError_Methods(t *testing.T) {
}
}

// TestRunDatasetRm_InvalidTableExitsTwo: rm validates the table name
// before any cluster work, so an unsafe name is exit-code-2 territory
// and never reaches kubeconfig/cluster resolution.
func TestRunDatasetRm_InvalidTableExitsTwo(t *testing.T) {
var buf bytes.Buffer
err := runDatasetRm(context.Background(), runDatasetRmArgs{
Table: "../bad",
Printer: ui.New(&buf, ui.WithColor(false)),
})
var ee *exitError
if !errors.As(err, &ee) {
t.Fatalf("error is not an *exitError: %v", err)
}
if ee.Code() != 2 {
t.Errorf("exit code = %d, want 2 (invalid table name)", ee.Code())
}
}

// TestRunClusterInfo_BadKubeconfigExitsThree: an unreadable/invalid
// kubeconfig is exit-code-3 territory (the kubeconfig/local-input
// bucket), surfaced before any cluster work. Covers the Load-error
Expand Down
6 changes: 4 additions & 2 deletions internal/cli/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
// verb is `push`, completed in Phase 3 (tracebloc/client#151) across
// PR-a (pre-flight: spec synth, validation, layout walk, cluster
// discovery) and PR-b (this one: ephemeral stage Pod + tar-over-
// exec stream + progress bar + SIGINT-safe cleanup). Future verbs
// (`dataset list`, `dataset rm`) hang off this parent in v0.2.
// exec stream + progress bar + SIGINT-safe cleanup). `dataset rm`
// (#30) removes a pushed dataset's in-cluster artifacts; `dataset
// list` hangs off this parent later.
func newDatasetCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "dataset",
Expand All @@ -39,6 +40,7 @@ ingestor Job to completion (streaming logs + the final summary).
before the first push.`,
}
cmd.AddCommand(newDatasetPushCmd())
cmd.AddCommand(newDatasetRmCmd())
return cmd
}

Expand Down
186 changes: 186 additions & 0 deletions internal/cli/dataset_rm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package cli

import (
"context"
"errors"
"fmt"

"github.com/spf13/cobra"

"github.com/tracebloc/cli/internal/cluster"
"github.com/tracebloc/cli/internal/push"
"github.com/tracebloc/cli/internal/ui"
)

// runDatasetRmArgs is the resolved input to runDatasetRm — same shape
// convention as runDatasetPushArgs, so the command's RunE stays a thin
// flag-to-struct adapter and the logic is unit-testable.
type runDatasetRmArgs struct {
Table string
Kubeconfig string
Context string
Namespace string
DryRun bool
Yes bool
Printer *ui.Printer
Prompter prompter // nil off a TTY or when --yes is set
}

// newDatasetRmCmd implements `tracebloc dataset rm <table>` — the
// in-cluster teardown of a previously-pushed dataset. See
// internal/push.Teardown for the mechanism and the design note on the
// approach (CLI-direct vs a server-side delete endpoint).
func newDatasetRmCmd() *cobra.Command {
var (
kubeconfigPath string
contextOverride string
nsOverride string
dryRun bool
yes bool
)

cmd := &cobra.Command{
Use: "rm <table>",
Short: "Delete a pushed dataset's in-cluster artifacts (table + PVC files)",
Long: `Removes the in-cluster artifacts a previous ` + "`dataset push`" + ` created
for a table: the MySQL table in ` + push.IngestionDatabase + ` and the dataset's
directories on the shared PVC. Destructive and not undoable.

NOTE: the central tracebloc backend catalog entry is NOT removed — the
CLI has no direct line to that backend. Full cleanup of a successfully
ingested dataset needs the server-side delete path (tracebloc/cli#39).

Exit codes:
0 artifacts removed (or --dry-run, or the user declined)
2 invalid table name
3 kubeconfig error, or refused (no confirmation off a terminal)
4 cluster reachable but parent release / shared PVC missing
7 teardown failed mid-flight (table drop or PVC rm errored)`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
// Confirm interactively on a TTY unless --yes was passed.
var pr prompter
if !yes && isInteractiveTTY() {
pr = surveyPrompter{}
}
return runDatasetRm(cmd.Context(), runDatasetRmArgs{
Table: args[0],
Kubeconfig: kubeconfigPath,
Context: contextOverride,
Namespace: nsOverride,
DryRun: dryRun,
Yes: yes,
Printer: printerFor(cmd),
Prompter: pr,
})
},
}

cmd.Flags().StringVar(&kubeconfigPath, "kubeconfig", "",
"path to the kubeconfig file (default: $KUBECONFIG, then ~/.kube/config)")
cmd.Flags().StringVar(&contextOverride, "context", "",
"name of the kubeconfig context to use (default: kubeconfig's current-context)")
cmd.Flags().StringVarP(&nsOverride, "namespace", "n", "",
"namespace where the parent tracebloc/client release is installed")
cmd.Flags().BoolVar(&dryRun, "dry-run", false,
"show what would be deleted without deleting anything")
cmd.Flags().BoolVarP(&yes, "yes", "y", false,
"skip the confirmation prompt (required when not on a terminal)")

return cmd
}

// runDatasetRm discovers the cluster, shows the teardown plan, confirms,
// then removes the in-cluster artifacts. The flow mirrors runDatasetPush
// (validate → discover → plan/pre-flight → act) so the two commands feel
// like siblings.
func runDatasetRm(ctx context.Context, a runDatasetRmArgs) error {
p := a.Printer
p.Banner("tracebloc", "delete a pushed dataset")

// 1. Validate the name before we build any PVC path from it
// (push.PlanTeardown panics on an unsafe name by design).
if err := push.ValidateTableName(a.Table); err != nil {
return &exitError{code: 2, err: fmt.Errorf("invalid table name %q: %w", a.Table, err)}
}

// 2. Resolve cluster + clientset (kubeconfig errors = exit 3).
resolved, err := cluster.Load(cluster.KubeconfigOptions{
Path: a.Kubeconfig,
Context: a.Context,
Namespace: a.Namespace,
})
if err != nil {
return &exitError{code: 3, err: fmt.Errorf("loading kubeconfig: %w", err)}
}
cs, err := cluster.NewClientset(resolved)
if err != nil {
return &exitError{code: 3, err: err}
}

// 3. Confirm the parent release + shared PVC exist (exit 4 if not) —
// both "is this the right cluster?" context and a guard against
// running teardown against a cluster with no tracebloc install.
release, err := cluster.DiscoverParentRelease(ctx, cs, resolved.Namespace)
if err != nil {
return &exitError{code: 4, err: err}
}
pvc, err := cluster.DiscoverSharedPVC(ctx, cs, resolved.Namespace)
if err != nil {
return &exitError{code: 4, err: err}
}

// 4. Show exactly what will be deleted — the customer's last look
// before destructive, unrecoverable work.
plan := push.PlanTeardown(a.Table)

p.Section("Target")
p.Field("context", resolved.Context)
p.Field("namespace", resolved.Namespace)
p.Field("release", release.ReleaseName)
p.Field("shared PVC", pvc.ClaimName)

p.Section("Will delete")
p.Field("mysql table", plan.Database+"."+plan.Table)
for _, path := range plan.PVCPaths {
p.Field("pvc path", path)
}
p.Warnf("Destructive and cannot be undone. The central backend catalog entry is NOT removed (tracebloc/cli#39).")

// 5. Dry-run stop.
if a.DryRun {
p.Successf("Dry-run — nothing was deleted.")
return nil
}

// 6. Confirm. --yes skips; off a TTY without --yes we refuse rather
// than delete unprompted.
if !a.Yes {
if a.Prompter == nil {
return &exitError{code: 3, err: errors.New(
"refusing to delete without confirmation: pass --yes or run on a terminal")}
}
ok, err := a.Prompter.Confirm(fmt.Sprintf("Delete %q and its files?", a.Table), false)
if err != nil {
if errors.Is(err, errInteractiveCancelled) {
p.Infof("Cancelled — nothing was deleted.")
return nil
}
return &exitError{code: 3, err: err}
}
if !ok {
p.Infof("Cancelled — nothing was deleted.")
return nil
}
}

// 7. Execute the in-cluster teardown.
p.Infof("Removing in-cluster artifacts…")
res, err := push.Teardown(ctx, cs, resolved.RestConfig, resolved.Namespace, plan)
if err != nil {
return &exitError{code: 7, err: fmt.Errorf("teardown failed: %w", err)}
}

p.Successf("Deleted %s.%s and %d PVC path(s).", plan.Database, plan.Table, len(res.RemovedPaths))
return nil
}
125 changes: 125 additions & 0 deletions internal/push/teardown.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package push

import (
"bytes"
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

// IngestionDatabase is the MySQL schema jobs-manager ingests tables
// into (data-ingestors' target DB). Centralized here so the push path
// and the teardown path agree on where a table lives.
const IngestionDatabase = "training_test_datasets"

// TeardownPlan enumerates the in-cluster artifacts `dataset rm` removes
// for a pushed table: the MySQL table and the dataset's directories on
// the shared PVC.
//
// It deliberately does NOT include the central tracebloc backend
// catalog entry: the CLI has no direct line to that backend (only the
// in-cluster ingestor does, with its own creds), so removing it is the
// cross-repo follow-up (tracebloc/cli#39). A successfully-ingested
// dataset torn down this way leaves a stale catalog entry until #39
// lands.
type TeardownPlan struct {
Database string // MySQL schema (IngestionDatabase)
Table string // table name — MUST have passed ValidateTableName
PVCPaths []string // absolute dirs on the shared PVC to rm -rf
}

// PlanTeardown computes the teardown plan for a table. It calls
// FinalDestPrefix/StagedPrefix, which panic on an unsafe name — every
// caller validates with ValidateTableName first (see cli.runDatasetRm).
func PlanTeardown(table string) TeardownPlan {
return TeardownPlan{
Database: IngestionDatabase,
Table: table,
PVCPaths: []string{FinalDestPrefix(table), StagedPrefix(table)},
}
}

// TeardownResult reports what Teardown actually removed.
type TeardownResult struct {
DroppedTable bool
RemovedPaths []string
}

// Teardown performs the in-cluster teardown described by plan, mirroring
// the manual kubectl-exec cleanup:
//
// - DROP the MySQL table by exec-ing `mysql` inside the mysql pod,
// referencing the pod's own $MYSQL_ROOT_PASSWORD — so no database
// credential ever transits the CLI.
// - rm -rf the PVC dirs by exec-ing inside the jobs-manager pod,
// which mounts the shared PVC at SharedRoot.
//
// DESIGN NOTE (under review): this exec-into-existing-pods approach is
// the "CLI-direct teardown". The alternative under discussion is a
// server-side jobs-manager delete endpoint that could also remove the
// backend catalog entry (#39) in one place. It assumes (a) a pod whose
// name contains "mysql" exposes $MYSQL_ROOT_PASSWORD, and (b) the
// jobs-manager pod mounts the shared PVC at SharedRoot — both true for
// the current parent chart, but worth confirming before this ships.
func Teardown(ctx context.Context, cs kubernetes.Interface, cfg *rest.Config, namespace string, plan TeardownPlan) (TeardownResult, error) {
var res TeardownResult
exec := &SPDYExecutor{Config: cfg, Client: cs}

// 1. DROP the table — mysql pod, localhost, its own root password.
mysqlPod, mysqlContainer, err := findRunningPod(ctx, cs, namespace, "mysql")
if err != nil {
return res, fmt.Errorf("locating mysql pod: %w", err)
}
sql := fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", plan.Database, plan.Table)
script := fmt.Sprintf(`mysql -uroot -p"$MYSQL_ROOT_PASSWORD" -e '%s'`, sql)
var stderr bytes.Buffer
if err := exec.Exec(ctx, namespace, mysqlPod, mysqlContainer,
[]string{"sh", "-c", script}, nil, nil, &stderr); err != nil {
return res, fmt.Errorf("dropping %s.%s: %w%s", plan.Database, plan.Table, err, stderrSuffix(&stderr))
}
res.DroppedTable = true

// 2. rm the PVC dirs — jobs-manager pod mounts the shared PVC.
jmPod, jmContainer, err := findRunningPod(ctx, cs, namespace, "jobs-manager")
if err != nil {
return res, fmt.Errorf("locating jobs-manager pod: %w", err)
}
stderr.Reset()
rmCmd := append([]string{"rm", "-rf"}, plan.PVCPaths...)
if err := exec.Exec(ctx, namespace, jmPod, jmContainer, rmCmd, nil, nil, &stderr); err != nil {
return res, fmt.Errorf("removing PVC paths: %w%s", err, stderrSuffix(&stderr))
}
res.RemovedPaths = plan.PVCPaths
return res, nil
}

// findRunningPod returns the name + first-container name of the first
// Running pod in namespace whose name contains substr.
func findRunningPod(ctx context.Context, cs kubernetes.Interface, namespace, substr string) (podName, container string, err error) {
pods, err := cs.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return "", "", err
}
for i := range pods.Items {
p := &pods.Items[i]
if p.Status.Phase == corev1.PodRunning && strings.Contains(p.Name, substr) && len(p.Spec.Containers) > 0 {
return p.Name, p.Spec.Containers[0].Name, nil
}
}
return "", "", fmt.Errorf("no Running pod with name containing %q in namespace %q", substr, namespace)
}

// stderrSuffix renders captured stderr as a parenthetical for error
// messages, or "" when the remote command was quiet.
func stderrSuffix(b *bytes.Buffer) string {
s := strings.TrimSpace(b.String())
if s == "" {
return ""
}
return " (" + s + ")"
}
30 changes: 30 additions & 0 deletions internal/push/teardown_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package push

import "testing"

// TestPlanTeardown pins the artifact set `dataset rm` targets: the
// MySQL table in IngestionDatabase + both PVC dirs (final dest +
// staging), in that order.
func TestPlanTeardown(t *testing.T) {
plan := PlanTeardown("reg_train")

if plan.Database != IngestionDatabase {
t.Errorf("Database = %q, want %q", plan.Database, IngestionDatabase)
}
if plan.Table != "reg_train" {
t.Errorf("Table = %q, want reg_train", plan.Table)
}

want := []string{
"/data/shared/reg_train",
"/data/shared/.tracebloc-staging/reg_train",
}
if len(plan.PVCPaths) != len(want) {
t.Fatalf("PVCPaths = %v, want %v", plan.PVCPaths, want)
}
for i := range want {
if plan.PVCPaths[i] != want[i] {
t.Errorf("PVCPaths[%d] = %q, want %q", i, plan.PVCPaths[i], want[i])
}
}
}
Loading