Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions inc/Cleanup/DataMachineJobCleanupRunEvidenceStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

namespace DataMachineCode\Cleanup;

use DataMachineCode\Support\SystemTaskDrainability;

defined('ABSPATH') || exit;

class DataMachineJobCleanupRunEvidenceStore implements CleanupRunEvidenceStoreInterface {
Expand Down Expand Up @@ -126,19 +128,20 @@ private function aggregate_cleanup_child_jobs( array $child_jobs ): array {
'remaining_safely_removable_worktrees' => 0,
),
'children' => array(
'batch_job_ids' => array(),
'chunk_job_ids' => array(),
'pending_job_ids' => array(),
'processing_job_ids' => array(),
'failed_job_ids' => array(),
'processing' => 0,
'completed' => 0,
'failed' => 0,
'skipped' => 0,
'running' => 0,
'total' => 0,
'statuses' => array(),
'job_ids' => array(),
'batch_job_ids' => array(),
'chunk_job_ids' => array(),
'pending_job_ids' => array(),
'pending_without_drainable_action_job_ids' => array(),
'processing_job_ids' => array(),
'failed_job_ids' => array(),
'processing' => 0,
'completed' => 0,
'failed' => 0,
'skipped' => 0,
'running' => 0,
'total' => 0,
'statuses' => array(),
'job_ids' => array(),
),
);

Expand All @@ -162,6 +165,9 @@ private function aggregate_cleanup_child_jobs( array $child_jobs ): array {
$summary['children']['job_ids'][] = $child_job_id;
if ( 'pending' === $status ) {
$summary['children']['pending_job_ids'][] = $child_job_id;
if ( ! SystemTaskDrainability::job_has_execute_step_action($child_job_id) ) {
$summary['children']['pending_without_drainable_action_job_ids'][] = $child_job_id;
}
} elseif ( 'processing' === $status && ! $idle_wrapper ) {
$summary['children']['processing_job_ids'][] = $child_job_id;
} elseif ( str_starts_with($status, 'failed') ) {
Expand Down Expand Up @@ -204,11 +210,12 @@ private function aggregate_cleanup_child_jobs( array $child_jobs ): array {
$summary['cleanup_items']['freed_human'] = $this->format_bytes($summary['cleanup_items']['bytes_reclaimed']);
$summary['children']['batch_job_ids'] = array_values(array_unique($summary['children']['batch_job_ids']));
$summary['children']['chunk_job_ids'] = array_values(array_unique($summary['children']['chunk_job_ids']));
$summary['children']['pending_job_ids'] = array_values(array_unique($summary['children']['pending_job_ids']));
$summary['children']['processing_job_ids'] = array_values(array_unique($summary['children']['processing_job_ids']));
$summary['children']['failed_job_ids'] = array_values(array_unique($summary['children']['failed_job_ids']));
$summary['children']['job_ids'] = array_values(array_unique($summary['children']['job_ids']));
$summary['children']['running'] = (int) $summary['children']['processing'];
$summary['children']['pending_job_ids'] = array_values(array_unique($summary['children']['pending_job_ids']));
$summary['children']['pending_without_drainable_action_job_ids'] = array_values(array_unique($summary['children']['pending_without_drainable_action_job_ids']));
$summary['children']['processing_job_ids'] = array_values(array_unique($summary['children']['processing_job_ids']));
$summary['children']['failed_job_ids'] = array_values(array_unique($summary['children']['failed_job_ids']));
$summary['children']['job_ids'] = array_values(array_unique($summary['children']['job_ids']));
$summary['children']['running'] = (int) $summary['children']['processing'];

return $summary;
}
Expand All @@ -224,6 +231,7 @@ private function summarize_cleanup_children( array $children ): array {
$batch_ids = (array) ( $children['batch_job_ids'] ?? array() );
$chunk_ids = (array) ( $children['chunk_job_ids'] ?? array() );
$pending = (array) ( $children['pending_job_ids'] ?? array() );
$undrainable = (array) ( $children['pending_without_drainable_action_job_ids'] ?? array() );
$processing = (array) ( $children['processing_job_ids'] ?? array() );

return array(
Expand All @@ -238,6 +246,7 @@ private function summarize_cleanup_children( array $children ): array {
'chunk_total' => count($chunk_ids),
'failed_job_ids' => (array) ( $children['failed_job_ids'] ?? array() ),
'pending_job_ids' => array_slice($pending, 0, $limit),
'pending_without_drainable_action_job_ids' => array_slice($undrainable, 0, $limit),
'processing_job_ids' => array_slice($processing, 0, $limit),
'pending_truncated' => count($pending) > $limit,
'processing_truncated' => count($processing) > $limit,
Expand Down Expand Up @@ -277,13 +286,24 @@ private function cleanup_run_drain_summary( int $job_id, string $state, array $c
if ( array() !== $active_child_ids ) {
$commands['active_children'] = sprintf('studio wp datamachine drain --job-id=%s', implode(',', $active_child_ids));
}
$undrainable_child_ids = array_values(
array_unique(
array_filter(
array_map('intval', (array) ( $children['pending_without_drainable_action_job_ids'] ?? array() ))
)
)
);
if ( array() !== $undrainable_child_ids ) {
$commands['repair_undrainable_children'] = sprintf('studio wp datamachine-code workspace cleanup resume %s --format=json', $run_id);
}

return array(
'needed' => in_array($state, array( 'running', 'waiting_on_children' ), true),
'commands' => $commands,
'active_child_job_ids' => $active_child_ids,
'bytes_reclaimed' => (int) ( $cleanup_items['bytes_reclaimed'] ?? 0 ),
'freed_human' => (string) ( $cleanup_items['freed_human'] ?? $this->format_bytes(0) ),
'active_child_job_ids' => $active_child_ids,
'undrainable_child_job_ids' => $undrainable_child_ids,
'bytes_reclaimed' => (int) ( $cleanup_items['bytes_reclaimed'] ?? 0 ),
'freed_human' => (string) ( $cleanup_items['freed_human'] ?? $this->format_bytes(0) ),
);
}

Expand Down
13 changes: 9 additions & 4 deletions inc/Cli/Commands/WorkspaceCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -1310,13 +1310,18 @@ private function cleanup_run_control_job_ids( string $operation, int $job_id ):
return array( $job_id );
}

$children = (array) ( $output['evidence']['children'] ?? array() );
$processing_ids = array_map('intval', (array) ( $children['processing_job_ids'] ?? array() ));
$failed_ids = array_map('intval', (array) ( $children['failed_job_ids'] ?? array() ));
$pending_ids = array_map('intval', (array) ( $children['pending_job_ids'] ?? array() ));
$children = (array) ( $output['evidence']['children'] ?? array() );
$processing_ids = array_map('intval', (array) ( $children['processing_job_ids'] ?? array() ));
$failed_ids = array_map('intval', (array) ( $children['failed_job_ids'] ?? array() ));
$pending_ids = array_map('intval', (array) ( $children['pending_job_ids'] ?? array() ));
$undrainable_ids = array_map('intval', (array) ( $children['pending_without_drainable_action_job_ids'] ?? array() ));

if ( 'resume' === $operation ) {
$repair = \DataMachineCode\Support\SystemTaskDrainability::ensure_jobs_have_execute_step_actions($undrainable_ids);
$child_targets = array_values(array_unique(array_filter(array_merge($processing_ids, $failed_ids))));
if ( array() === $child_targets && (int) ( $repair['repaired'] ?? 0 ) > 0 ) {
return array();
}
return array() !== $child_targets ? $child_targets : array( $job_id );
}

Expand Down
205 changes: 205 additions & 0 deletions inc/Support/SystemTaskDrainability.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
<?php
/**
* Data Machine system-task drainability helpers.
*
* @package DataMachineCode\Support
*/

namespace DataMachineCode\Support;

defined('ABSPATH') || exit;

class SystemTaskDrainability {

private const EXECUTE_STEP_HOOK = 'datamachine_execute_step';

private const ACTION_GROUP = 'data-machine';

/**
* Ensure a pending workflow job has the Action Scheduler action that drain can run.
*
* @param int $job_id Data Machine job ID.
* @return bool True when an action already exists or was scheduled.
*/
public static function ensure_job_has_execute_step_action( int $job_id ): bool {
if ( $job_id <= 0 || self::job_has_execute_step_action($job_id) || ! function_exists('as_schedule_single_action') ) {
return $job_id > 0 && self::job_has_execute_step_action($job_id);
}

$flow_step_id = self::first_step_id($job_id);
if ( '' === $flow_step_id ) {
return false;
}

$action_id = as_schedule_single_action(
time(),
self::EXECUTE_STEP_HOOK,
array(
'job_id' => $job_id,
'flow_step_id' => $flow_step_id,
),
self::ACTION_GROUP
);

if ( false === $action_id ) {
return false;
}

if ( function_exists('datamachine_merge_engine_data') ) {
datamachine_merge_engine_data(
$job_id,
array(
'drainability_repair' => array(
'action_id' => (int) $action_id,
'hook' => self::EXECUTE_STEP_HOOK,
'flow_step_id' => $flow_step_id,
'repaired_at' => gmdate('c'),
),
)
);
}

return true;
}

/**
* Ensure every supplied job has drainable work and return repair stats.
*
* @param array<int,int|string> $job_ids Data Machine job IDs.
* @return array{checked:int,repaired:int,unrepairable:array<int,int>}
*/
public static function ensure_jobs_have_execute_step_actions( array $job_ids ): array {
$checked = 0;
$repaired = 0;
$unrepairable = array();

foreach ( array_values(array_unique(array_map('intval', $job_ids))) as $job_id ) {
if ( $job_id <= 0 || ! self::is_pending_job($job_id) ) {
continue;
}

++$checked;
$had_action = self::job_has_execute_step_action($job_id);
if ( self::ensure_job_has_execute_step_action($job_id) ) {
if ( ! $had_action ) {
++$repaired;
}
continue;
}

$unrepairable[] = $job_id;
}

return array(
'checked' => $checked,
'repaired' => $repaired,
'unrepairable' => $unrepairable,
);
}

/**
* Return pending jobs from a list that lack a drainable execute-step action.
*
* @param array<int,int|string> $job_ids Data Machine job IDs.
* @return array<int,int>
*/
public static function pending_jobs_missing_execute_step_actions( array $job_ids ): array {
$missing = array();
foreach ( array_values(array_unique(array_map('intval', $job_ids))) as $job_id ) {
if ( $job_id > 0 && self::is_pending_job($job_id) && ! self::job_has_execute_step_action($job_id) ) {
$missing[] = $job_id;
}
}

return $missing;
}

/**
* Determine whether a job has a pending execute-step action scoped to it.
*
* @param int $job_id Data Machine job ID.
* @return bool
*/
public static function job_has_execute_step_action( int $job_id ): bool {
if ( $job_id <= 0 || ! self::actions_table_available() ) {
return false;
}

global $wpdb;
$actions_table = $wpdb->prefix . 'actionscheduler_actions';
$groups_table = $wpdb->prefix . 'actionscheduler_groups';

// phpcs:disable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching -- Operational status check for Action Scheduler work owned by Data Machine.
$count = (int) $wpdb->get_var(
$wpdb->prepare(
"SELECT COUNT(*)
FROM %i a
INNER JOIN %i g ON g.group_id = a.group_id
WHERE a.hook = %s
AND a.status = 'pending'
AND g.slug = %s
AND (a.args LIKE %s OR a.args LIKE %s)",
$actions_table,
$groups_table,
self::EXECUTE_STEP_HOOK,
self::ACTION_GROUP,
'%"job_id":' . $job_id . ',%',
'%"job_id":' . $job_id . '}%'
)
);
// phpcs:enable WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching

return $count > 0;
}

private static function is_pending_job( int $job_id ): bool {
$job = self::get_job($job_id);
return 'pending' === (string) ( $job['status'] ?? '' );
}

/**
* @return array<string,mixed>
*/
private static function get_job( int $job_id ): array {
$ability = function_exists('wp_get_ability') ? wp_get_ability('datamachine/get-jobs') : null;
if ( ! $ability ) {
return array();
}

$result = $ability->execute(array( 'job_id' => $job_id ));
if ( ! ( $result['success'] ?? false ) || ! is_array($result['jobs'] ?? null) ) {
return array();
}

$job = $result['jobs'][0] ?? array();
return is_array($job) ? $job : array();
}

private static function first_step_id( int $job_id ): string {
if ( ! function_exists('datamachine_get_engine_data') ) {
return '';
}

$engine_data = datamachine_get_engine_data($job_id);
$flow_config = is_array($engine_data['flow_config'] ?? null) ? $engine_data['flow_config'] : array();
if ( array() === $flow_config ) {
return '';
}

if ( class_exists('\DataMachine\Engine\ExecutionPlan') ) {
try {
return (string) \DataMachine\Engine\ExecutionPlan::from_flow_config($flow_config)->first_step_id();
} catch ( \InvalidArgumentException ) {
return '';
}
}

$keys = array_keys($flow_config);
return (string) ( $keys[0] ?? '' );
}

private static function actions_table_available(): bool {
global $wpdb;
return isset($wpdb) && is_object($wpdb) && isset($wpdb->prefix) && method_exists($wpdb, 'get_var') && method_exists($wpdb, 'prepare');
}
}
6 changes: 6 additions & 0 deletions inc/Tasks/WorkspaceRetentionCleanupTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use DataMachine\Core\PluginSettings;
use DataMachine\Engine\AI\System\Tasks\SystemTask;
use DataMachine\Engine\Tasks\TaskScheduler;
use DataMachineCode\Support\SystemTaskDrainability;
use DataMachineCode\Workspace\Workspace;

defined('ABSPATH') || exit;
Expand Down Expand Up @@ -207,6 +208,10 @@ private function schedule_job_backed_cleanup( int $jobId, Workspace $workspace,
return new \WP_Error('cleanup_chunk_schedule_failed', 'Failed to schedule cleanup chunk jobs.', array( 'status' => 500 ));
}

$drainability = SystemTaskDrainability::ensure_jobs_have_execute_step_actions(
is_array($batch['job_ids'] ?? null) ? $batch['job_ids'] : array()
);

return array(
'success' => true,
'dry_run' => false,
Expand Down Expand Up @@ -237,6 +242,7 @@ private function schedule_job_backed_cleanup( int $jobId, Workspace $workspace,
'planned_handles' => $this->cleanup_chunk_handles($chunk_rows),
'batch_job_id' => (int) ( $batch['batch_job_id'] ?? 0 ),
'direct_job_ids' => $batch['job_ids'] ?? array(),
'drainability' => $drainability,
),
);
}
Expand Down
Loading
Loading