From 5ef8b2223446525c73daf6859bc2c7c70a3cb409 Mon Sep 17 00:00:00 2001 From: "homeboy-ci[bot]" <266378653+homeboy-ci[bot]@users.noreply.github.com> Date: Thu, 25 Jun 2026 22:25:43 -0400 Subject: [PATCH] Fix artifact cleanup child drainability --- .../DataMachineJobCleanupRunEvidenceStore.php | 62 ++++-- inc/Cli/Commands/WorkspaceCommand.php | 13 +- inc/Support/SystemTaskDrainability.php | 205 ++++++++++++++++++ inc/Tasks/WorkspaceRetentionCleanupTask.php | 6 + .../artifact-cleanup-drainability-repair.php | 124 +++++++++++ 5 files changed, 385 insertions(+), 25 deletions(-) create mode 100644 inc/Support/SystemTaskDrainability.php create mode 100644 tests/artifact-cleanup-drainability-repair.php diff --git a/inc/Cleanup/DataMachineJobCleanupRunEvidenceStore.php b/inc/Cleanup/DataMachineJobCleanupRunEvidenceStore.php index 426c842f..07fdef99 100644 --- a/inc/Cleanup/DataMachineJobCleanupRunEvidenceStore.php +++ b/inc/Cleanup/DataMachineJobCleanupRunEvidenceStore.php @@ -7,6 +7,8 @@ namespace DataMachineCode\Cleanup; +use DataMachineCode\Support\SystemTaskDrainability; + defined('ABSPATH') || exit; class DataMachineJobCleanupRunEvidenceStore implements CleanupRunEvidenceStoreInterface { @@ -126,19 +128,20 @@ private function aggregate_cleanup_child_jobs( array $child_jobs ): array { 'remaining_safely_removable_worktrees' => 0, ), 'children' => array( - 'batch_job_ids' => array(), - 'chunk_job_ids' => array(), - 'pending_job_ids' => array(), - 'processing_job_ids' => array(), - 'failed_job_ids' => array(), - 'processing' => 0, - 'completed' => 0, - 'failed' => 0, - 'skipped' => 0, - 'running' => 0, - 'total' => 0, - 'statuses' => array(), - 'job_ids' => array(), + 'batch_job_ids' => array(), + 'chunk_job_ids' => array(), + 'pending_job_ids' => array(), + 'pending_without_drainable_action_job_ids' => array(), + 'processing_job_ids' => array(), + 'failed_job_ids' => array(), + 'processing' => 0, + 'completed' => 0, + 'failed' => 0, + 'skipped' => 0, + 'running' => 0, + 'total' => 0, + 'statuses' => array(), + 'job_ids' => array(), ), ); @@ -162,6 +165,9 @@ private function aggregate_cleanup_child_jobs( array $child_jobs ): array { $summary['children']['job_ids'][] = $child_job_id; if ( 'pending' === $status ) { $summary['children']['pending_job_ids'][] = $child_job_id; + if ( ! SystemTaskDrainability::job_has_execute_step_action($child_job_id) ) { + $summary['children']['pending_without_drainable_action_job_ids'][] = $child_job_id; + } } elseif ( 'processing' === $status && ! $idle_wrapper ) { $summary['children']['processing_job_ids'][] = $child_job_id; } elseif ( str_starts_with($status, 'failed') ) { @@ -204,11 +210,12 @@ private function aggregate_cleanup_child_jobs( array $child_jobs ): array { $summary['cleanup_items']['freed_human'] = $this->format_bytes($summary['cleanup_items']['bytes_reclaimed']); $summary['children']['batch_job_ids'] = array_values(array_unique($summary['children']['batch_job_ids'])); $summary['children']['chunk_job_ids'] = array_values(array_unique($summary['children']['chunk_job_ids'])); - $summary['children']['pending_job_ids'] = array_values(array_unique($summary['children']['pending_job_ids'])); - $summary['children']['processing_job_ids'] = array_values(array_unique($summary['children']['processing_job_ids'])); - $summary['children']['failed_job_ids'] = array_values(array_unique($summary['children']['failed_job_ids'])); - $summary['children']['job_ids'] = array_values(array_unique($summary['children']['job_ids'])); - $summary['children']['running'] = (int) $summary['children']['processing']; + $summary['children']['pending_job_ids'] = array_values(array_unique($summary['children']['pending_job_ids'])); + $summary['children']['pending_without_drainable_action_job_ids'] = array_values(array_unique($summary['children']['pending_without_drainable_action_job_ids'])); + $summary['children']['processing_job_ids'] = array_values(array_unique($summary['children']['processing_job_ids'])); + $summary['children']['failed_job_ids'] = array_values(array_unique($summary['children']['failed_job_ids'])); + $summary['children']['job_ids'] = array_values(array_unique($summary['children']['job_ids'])); + $summary['children']['running'] = (int) $summary['children']['processing']; return $summary; } @@ -224,6 +231,7 @@ private function summarize_cleanup_children( array $children ): array { $batch_ids = (array) ( $children['batch_job_ids'] ?? array() ); $chunk_ids = (array) ( $children['chunk_job_ids'] ?? array() ); $pending = (array) ( $children['pending_job_ids'] ?? array() ); + $undrainable = (array) ( $children['pending_without_drainable_action_job_ids'] ?? array() ); $processing = (array) ( $children['processing_job_ids'] ?? array() ); return array( @@ -238,6 +246,7 @@ private function summarize_cleanup_children( array $children ): array { 'chunk_total' => count($chunk_ids), 'failed_job_ids' => (array) ( $children['failed_job_ids'] ?? array() ), 'pending_job_ids' => array_slice($pending, 0, $limit), + 'pending_without_drainable_action_job_ids' => array_slice($undrainable, 0, $limit), 'processing_job_ids' => array_slice($processing, 0, $limit), 'pending_truncated' => count($pending) > $limit, 'processing_truncated' => count($processing) > $limit, @@ -277,13 +286,24 @@ private function cleanup_run_drain_summary( int $job_id, string $state, array $c if ( array() !== $active_child_ids ) { $commands['active_children'] = sprintf('studio wp datamachine drain --job-id=%s', implode(',', $active_child_ids)); } + $undrainable_child_ids = array_values( + array_unique( + array_filter( + array_map('intval', (array) ( $children['pending_without_drainable_action_job_ids'] ?? array() )) + ) + ) + ); + if ( array() !== $undrainable_child_ids ) { + $commands['repair_undrainable_children'] = sprintf('studio wp datamachine-code workspace cleanup resume %s --format=json', $run_id); + } return array( 'needed' => in_array($state, array( 'running', 'waiting_on_children' ), true), 'commands' => $commands, - 'active_child_job_ids' => $active_child_ids, - 'bytes_reclaimed' => (int) ( $cleanup_items['bytes_reclaimed'] ?? 0 ), - 'freed_human' => (string) ( $cleanup_items['freed_human'] ?? $this->format_bytes(0) ), + 'active_child_job_ids' => $active_child_ids, + 'undrainable_child_job_ids' => $undrainable_child_ids, + 'bytes_reclaimed' => (int) ( $cleanup_items['bytes_reclaimed'] ?? 0 ), + 'freed_human' => (string) ( $cleanup_items['freed_human'] ?? $this->format_bytes(0) ), ); } diff --git a/inc/Cli/Commands/WorkspaceCommand.php b/inc/Cli/Commands/WorkspaceCommand.php index 6e6ad235..4464ea5b 100644 --- a/inc/Cli/Commands/WorkspaceCommand.php +++ b/inc/Cli/Commands/WorkspaceCommand.php @@ -1310,13 +1310,18 @@ private function cleanup_run_control_job_ids( string $operation, int $job_id ): return array( $job_id ); } - $children = (array) ( $output['evidence']['children'] ?? array() ); - $processing_ids = array_map('intval', (array) ( $children['processing_job_ids'] ?? array() )); - $failed_ids = array_map('intval', (array) ( $children['failed_job_ids'] ?? array() )); - $pending_ids = array_map('intval', (array) ( $children['pending_job_ids'] ?? array() )); + $children = (array) ( $output['evidence']['children'] ?? array() ); + $processing_ids = array_map('intval', (array) ( $children['processing_job_ids'] ?? array() )); + $failed_ids = array_map('intval', (array) ( $children['failed_job_ids'] ?? array() )); + $pending_ids = array_map('intval', (array) ( $children['pending_job_ids'] ?? array() )); + $undrainable_ids = array_map('intval', (array) ( $children['pending_without_drainable_action_job_ids'] ?? array() )); if ( 'resume' === $operation ) { + $repair = \DataMachineCode\Support\SystemTaskDrainability::ensure_jobs_have_execute_step_actions($undrainable_ids); $child_targets = array_values(array_unique(array_filter(array_merge($processing_ids, $failed_ids)))); + if ( array() === $child_targets && (int) ( $repair['repaired'] ?? 0 ) > 0 ) { + return array(); + } return array() !== $child_targets ? $child_targets : array( $job_id ); } diff --git a/inc/Support/SystemTaskDrainability.php b/inc/Support/SystemTaskDrainability.php new file mode 100644 index 00000000..04c39cf9 --- /dev/null +++ b/inc/Support/SystemTaskDrainability.php @@ -0,0 +1,205 @@ + 0 && self::job_has_execute_step_action($job_id); + } + + $flow_step_id = self::first_step_id($job_id); + if ( '' === $flow_step_id ) { + return false; + } + + $action_id = as_schedule_single_action( + time(), + self::EXECUTE_STEP_HOOK, + array( + 'job_id' => $job_id, + 'flow_step_id' => $flow_step_id, + ), + self::ACTION_GROUP + ); + + if ( false === $action_id ) { + return false; + } + + if ( function_exists('datamachine_merge_engine_data') ) { + datamachine_merge_engine_data( + $job_id, + array( + 'drainability_repair' => array( + 'action_id' => (int) $action_id, + 'hook' => self::EXECUTE_STEP_HOOK, + 'flow_step_id' => $flow_step_id, + 'repaired_at' => gmdate('c'), + ), + ) + ); + } + + return true; + } + + /** + * Ensure every supplied job has drainable work and return repair stats. + * + * @param array $job_ids Data Machine job IDs. + * @return array{checked:int,repaired:int,unrepairable:array} + */ + public static function ensure_jobs_have_execute_step_actions( array $job_ids ): array { + $checked = 0; + $repaired = 0; + $unrepairable = array(); + + foreach ( array_values(array_unique(array_map('intval', $job_ids))) as $job_id ) { + if ( $job_id <= 0 || ! self::is_pending_job($job_id) ) { + continue; + } + + ++$checked; + $had_action = self::job_has_execute_step_action($job_id); + if ( self::ensure_job_has_execute_step_action($job_id) ) { + if ( ! $had_action ) { + ++$repaired; + } + continue; + } + + $unrepairable[] = $job_id; + } + + return array( + 'checked' => $checked, + 'repaired' => $repaired, + 'unrepairable' => $unrepairable, + ); + } + + /** + * Return pending jobs from a list that lack a drainable execute-step action. + * + * @param array $job_ids Data Machine job IDs. + * @return array + */ + public static function pending_jobs_missing_execute_step_actions( array $job_ids ): array { + $missing = array(); + foreach ( array_values(array_unique(array_map('intval', $job_ids))) as $job_id ) { + if ( $job_id > 0 && self::is_pending_job($job_id) && ! self::job_has_execute_step_action($job_id) ) { + $missing[] = $job_id; + } + } + + return $missing; + } + + /** + * Determine whether a job has a pending execute-step action scoped to it. + * + * @param int $job_id Data Machine job ID. + * @return bool + */ + public static function job_has_execute_step_action( int $job_id ): bool { + if ( $job_id <= 0 || ! self::actions_table_available() ) { + return false; + } + + global $wpdb; + $actions_table = $wpdb->prefix . 'actionscheduler_actions'; + $groups_table = $wpdb->prefix . 'actionscheduler_groups'; + + // phpcs:disable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching -- Operational status check for Action Scheduler work owned by Data Machine. + $count = (int) $wpdb->get_var( + $wpdb->prepare( + "SELECT COUNT(*) + FROM %i a + INNER JOIN %i g ON g.group_id = a.group_id + WHERE a.hook = %s + AND a.status = 'pending' + AND g.slug = %s + AND (a.args LIKE %s OR a.args LIKE %s)", + $actions_table, + $groups_table, + self::EXECUTE_STEP_HOOK, + self::ACTION_GROUP, + '%"job_id":' . $job_id . ',%', + '%"job_id":' . $job_id . '}%' + ) + ); + // phpcs:enable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching + + return $count > 0; + } + + private static function is_pending_job( int $job_id ): bool { + $job = self::get_job($job_id); + return 'pending' === (string) ( $job['status'] ?? '' ); + } + + /** + * @return array + */ + private static function get_job( int $job_id ): array { + $ability = function_exists('wp_get_ability') ? wp_get_ability('datamachine/get-jobs') : null; + if ( ! $ability ) { + return array(); + } + + $result = $ability->execute(array( 'job_id' => $job_id )); + if ( ! ( $result['success'] ?? false ) || ! is_array($result['jobs'] ?? null) ) { + return array(); + } + + $job = $result['jobs'][0] ?? array(); + return is_array($job) ? $job : array(); + } + + private static function first_step_id( int $job_id ): string { + if ( ! function_exists('datamachine_get_engine_data') ) { + return ''; + } + + $engine_data = datamachine_get_engine_data($job_id); + $flow_config = is_array($engine_data['flow_config'] ?? null) ? $engine_data['flow_config'] : array(); + if ( array() === $flow_config ) { + return ''; + } + + if ( class_exists('\DataMachine\Engine\ExecutionPlan') ) { + try { + return (string) \DataMachine\Engine\ExecutionPlan::from_flow_config($flow_config)->first_step_id(); + } catch ( \InvalidArgumentException ) { + return ''; + } + } + + $keys = array_keys($flow_config); + return (string) ( $keys[0] ?? '' ); + } + + private static function actions_table_available(): bool { + global $wpdb; + return isset($wpdb) && is_object($wpdb) && isset($wpdb->prefix) && method_exists($wpdb, 'get_var') && method_exists($wpdb, 'prepare'); + } +} diff --git a/inc/Tasks/WorkspaceRetentionCleanupTask.php b/inc/Tasks/WorkspaceRetentionCleanupTask.php index 6025bd73..79c4166f 100644 --- a/inc/Tasks/WorkspaceRetentionCleanupTask.php +++ b/inc/Tasks/WorkspaceRetentionCleanupTask.php @@ -10,6 +10,7 @@ use DataMachine\Core\PluginSettings; use DataMachine\Engine\AI\System\Tasks\SystemTask; use DataMachine\Engine\Tasks\TaskScheduler; +use DataMachineCode\Support\SystemTaskDrainability; use DataMachineCode\Workspace\Workspace; defined('ABSPATH') || exit; @@ -207,6 +208,10 @@ private function schedule_job_backed_cleanup( int $jobId, Workspace $workspace, return new \WP_Error('cleanup_chunk_schedule_failed', 'Failed to schedule cleanup chunk jobs.', array( 'status' => 500 )); } + $drainability = SystemTaskDrainability::ensure_jobs_have_execute_step_actions( + is_array($batch['job_ids'] ?? null) ? $batch['job_ids'] : array() + ); + return array( 'success' => true, 'dry_run' => false, @@ -237,6 +242,7 @@ private function schedule_job_backed_cleanup( int $jobId, Workspace $workspace, 'planned_handles' => $this->cleanup_chunk_handles($chunk_rows), 'batch_job_id' => (int) ( $batch['batch_job_id'] ?? 0 ), 'direct_job_ids' => $batch['job_ids'] ?? array(), + 'drainability' => $drainability, ), ); } diff --git a/tests/artifact-cleanup-drainability-repair.php b/tests/artifact-cleanup-drainability-repair.php new file mode 100644 index 00000000..20dcca46 --- /dev/null +++ b/tests/artifact-cleanup-drainability-repair.php @@ -0,0 +1,124 @@ + array( + 'job_id' => 2095, + 'source' => 'system', + 'status' => 'pending', + 'engine_data' => array( + 'task_type' => 'worktree_cleanup_chunk', + 'task_params' => array( + 'chunk_type' => 'artifacts', + ), + 'flow_config' => array( + 'step-system-task-1' => array( + 'step_type' => 'system_task', + ), + ), + ), + ), + ); + $GLOBALS['artifact_cleanup_test_actions'] = array(); + $GLOBALS['artifact_cleanup_test_merges'] = array(); + + final class ArtifactCleanupDrainabilityWpdb { + public string $prefix = 'wp_'; + + public function prepare( string $query, mixed ...$args ): array { + return array( + 'query' => $query, + 'args' => $args, + ); + } + + public function get_var( mixed $prepared ): int { + $args = is_array($prepared) ? (array) ( $prepared['args'] ?? array() ) : array(); + $hook = (string) ( $args[2] ?? '' ); + $group = (string) ( $args[3] ?? '' ); + $job_pattern = (string) ( $args[4] ?? '' ); + preg_match('/"job_id":(\d+)/', $job_pattern, $matches); + $job_id = (int) ( $matches[1] ?? 0 ); + + $count = 0; + foreach ( (array) $GLOBALS['artifact_cleanup_test_actions'] as $action ) { + if ( + $hook === (string) ( $action['hook'] ?? '' ) + && $group === (string) ( $action['group'] ?? '' ) + && $job_id === (int) ( $action['args']['job_id'] ?? 0 ) + ) { + ++$count; + } + } + + return $count; + } + } + + $GLOBALS['wpdb'] = new ArtifactCleanupDrainabilityWpdb(); + + final class ArtifactCleanupDrainabilityJobsAbility { + public function execute( array $input ): array { + $job_id = (int) ( $input['job_id'] ?? 0 ); + $job = $GLOBALS['artifact_cleanup_test_jobs'][ $job_id ] ?? null; + + return array( + 'success' => null !== $job, + 'jobs' => null !== $job ? array( $job ) : array(), + ); + } + } + + function wp_get_ability( string $name ): ?ArtifactCleanupDrainabilityJobsAbility { + return 'datamachine/get-jobs' === $name ? new ArtifactCleanupDrainabilityJobsAbility() : null; + } + + function datamachine_get_engine_data( int $job_id ): array { + return (array) ( $GLOBALS['artifact_cleanup_test_jobs'][ $job_id ]['engine_data'] ?? array() ); + } + + function datamachine_merge_engine_data( int $job_id, array $data ): void { + $GLOBALS['artifact_cleanup_test_merges'][ $job_id ][] = $data; + } + + function as_schedule_single_action( int $timestamp, string $hook, array $args, string $group ): int { + $GLOBALS['artifact_cleanup_test_actions'][] = compact('timestamp', 'hook', 'args', 'group'); + return count($GLOBALS['artifact_cleanup_test_actions']); + } + + function artifact_cleanup_drainability_assert_same( mixed $expected, mixed $actual, string $message ): void { + if ( $expected !== $actual ) { + throw new RuntimeException(sprintf("%s\nExpected: %s\nActual: %s", $message, var_export($expected, true), var_export($actual, true))); + } + } + + artifact_cleanup_drainability_assert_same( + array( 2095 ), + SystemTaskDrainability::pending_jobs_missing_execute_step_actions(array( 2095 )), + 'Pending artifact cleanup chunk should be reported when no drainable execute-step action exists.' + ); + + $result = SystemTaskDrainability::ensure_jobs_have_execute_step_actions(array( 2095 )); + artifact_cleanup_drainability_assert_same(1, $result['checked'], 'Repair should inspect the pending child job.'); + artifact_cleanup_drainability_assert_same(1, $result['repaired'], 'Repair should schedule the missing execute-step action.'); + artifact_cleanup_drainability_assert_same('datamachine_execute_step', $GLOBALS['artifact_cleanup_test_actions'][0]['hook'] ?? null, 'Repair should use the drainable Data Machine execute-step hook.'); + artifact_cleanup_drainability_assert_same(2095, $GLOBALS['artifact_cleanup_test_actions'][0]['args']['job_id'] ?? null, 'Repair action should target the stuck child job.'); + artifact_cleanup_drainability_assert_same('step-system-task-1', $GLOBALS['artifact_cleanup_test_actions'][0]['args']['flow_step_id'] ?? null, 'Repair action should resume the job through its workflow first step.'); + artifact_cleanup_drainability_assert_same(array(), SystemTaskDrainability::pending_jobs_missing_execute_step_actions(array( 2095 )), 'Repaired child job should no longer be reported as missing drainable work.'); + + $result = SystemTaskDrainability::ensure_jobs_have_execute_step_actions(array( 2095 )); + artifact_cleanup_drainability_assert_same(0, $result['repaired'], 'Existing execute-step action should not be duplicated.'); + artifact_cleanup_drainability_assert_same(1, count($GLOBALS['artifact_cleanup_test_actions']), 'Repair should schedule at most one action for the child job.'); + + echo "artifact cleanup drainability repair test passed.\n"; +}