diff --git a/inc/Cli/Commands/DrainCommand.php b/inc/Cli/Commands/DrainCommand.php index 22c625ac6..0f67b67cd 100644 --- a/inc/Cli/Commands/DrainCommand.php +++ b/inc/Cli/Commands/DrainCommand.php @@ -46,6 +46,10 @@ class DrainCommand extends BaseCommand { * default: 0 * --- * + * [--job-id=] + * : Optional comma-separated Data Machine job IDs to drain. Useful when + * unrelated due work is blocked ahead of a known cleanup or retry run. + * * [--format=] * : Output format. * --- @@ -71,6 +75,7 @@ public function __invoke( array $args, array $assoc_args ): void { 'limit' => isset( $assoc_args['limit'] ) ? (int) $assoc_args['limit'] : 0, 'batch_size' => isset( $assoc_args['batch-size'] ) ? (int) $assoc_args['batch-size'] : 25, 'time_limit' => isset( $assoc_args['time-limit'] ) ? (int) $assoc_args['time-limit'] : 0, + 'job_ids' => isset( $assoc_args['job-id'] ) ? (string) $assoc_args['job-id'] : '', ) ); @@ -89,7 +94,7 @@ public function __invoke( array $args, array $assoc_args ): void { * so this drain runs concrete due action IDs from that same group instead of * a hand-maintained hook allow-list. * - * @param array{limit?:int,batch_size?:int,time_limit?:int,hooks?:string[]} $options Drain options. + * @param array{limit?:int,batch_size?:int,time_limit?:int,hooks?:string[],job_ids?:string|int[]} $options Drain options. * @return array Drain stats. */ public static function drain( array $options = array() ): array { @@ -97,14 +102,15 @@ public static function drain( array $options = array() ): array { $batch_size = max( 1, (int) ( $options['batch_size'] ?? 25 ) ); $time_limit = max( 0, (int) ( $options['time_limit'] ?? 0 ) ); $hooks = self::normalizeHooks( $options['hooks'] ?? null ); + $job_ids = self::normalizeJobIds( $options['job_ids'] ?? null ); $started_at = time(); - $before_counts = self::getStatusCounts( $hooks ); + $before_counts = self::getStatusCounts( $hooks, $job_ids ); $batches = 0; $warnings = 0; - while ( self::getDuePendingCount( $hooks ) > 0 ) { - $stats = self::buildStats( $before_counts, self::getStatusCounts( $hooks ), $batches, $warnings, $hooks ); + while ( self::getDuePendingCount( $hooks, $job_ids ) > 0 ) { + $stats = self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids ); if ( $limit > 0 && (int) $stats['actions_processed'] >= $limit ) { break; } @@ -121,13 +127,13 @@ public static function drain( array $options = array() ): array { break; } - $action_ids = self::getDuePendingActionIds( $current_batch_size, $hooks ); + $action_ids = self::getDuePendingActionIds( $current_batch_size, $hooks, $job_ids ); if ( empty( $action_ids ) ) { break; } - $due_before = self::getDuePendingCount( $hooks ); - $status_before = self::getStatusCounts( $hooks ); + $due_before = self::getDuePendingCount( $hooks, $job_ids ); + $status_before = self::getStatusCounts( $hooks, $job_ids ); $result = self::runActionSchedulerActions( $action_ids ); ++$batches; @@ -138,31 +144,33 @@ public static function drain( array $options = array() ): array { break; } - $status_after = self::getStatusCounts( $hooks ); + $status_after = self::getStatusCounts( $hooks, $job_ids ); $progress = self::processedDelta( $status_before, $status_after ); - if ( 0 === $progress && self::getDuePendingCount( $hooks ) >= $due_before ) { + if ( 0 === $progress && self::getDuePendingCount( $hooks, $job_ids ) >= $due_before ) { ++$warnings; WP_CLI::warning( 'Drain stopped because Action Scheduler made no observable progress.' ); break; } } - return self::buildStats( $before_counts, self::getStatusCounts( $hooks ), $batches, $warnings, $hooks ); + return self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids ); } /** * Return a read-only Data Machine Action Scheduler status snapshot. * - * @param array{hooks?:string[]} $options Status options. + * @param array{hooks?:string[],job_ids?:string|int[]} $options Status options. * @return array Status stats. */ public static function status( array $options = array() ): array { - $hooks = self::normalizeHooks( $options['hooks'] ?? null ); + $hooks = self::normalizeHooks( $options['hooks'] ?? null ); + $job_ids = self::normalizeJobIds( $options['job_ids'] ?? null ); return array( - 'due_pending' => self::getDuePendingCount( $hooks ), - 'total_pending' => self::getPendingCount( $hooks ), - 'hooks' => implode( ',', array_keys( self::getStatusCounts( $hooks ) ) ), + 'due_pending' => self::getDuePendingCount( $hooks, $job_ids ), + 'total_pending' => self::getPendingCount( $hooks, $job_ids ), + 'hooks' => implode( ',', array_keys( self::getStatusCounts( $hooks, $job_ids ) ) ), + 'job_ids' => implode( ',', $job_ids ), ); } @@ -205,7 +213,7 @@ private static function runActionSchedulerActions( array $action_ids ): object { * @param int $warnings Warning count. * @return array Stats. */ - private static function buildStats( array $before_counts, array $after_counts, int $batches, int $warnings, ?array $hooks = null ): array { + private static function buildStats( array $before_counts, array $after_counts, int $batches, int $warnings, ?array $hooks = null, array $job_ids = array() ): array { $batch_completed = self::delta( $before_counts, $after_counts, self::HOOK_BATCH_CHUNK, 'complete' ); $batch_failed = self::delta( $before_counts, $after_counts, self::HOOK_BATCH_CHUNK, 'failed' ); $step_completed = self::delta( $before_counts, $after_counts, self::HOOK_EXECUTE_STEP, 'complete' ); @@ -227,10 +235,11 @@ private static function buildStats( array $before_counts, array $after_counts, i 'step_execution_failures' => $step_failed, 'actions_processed' => $total_processed, 'other_actions' => max( 0, $total_processed - $tracked_processed ), - 'remaining_pending' => self::getDuePendingCount( $hooks ), - 'total_pending' => self::getPendingCount( $hooks ), + 'remaining_pending' => self::getDuePendingCount( $hooks, $job_ids ), + 'total_pending' => self::getPendingCount( $hooks, $job_ids ), 'warnings' => $warnings, 'hooks' => implode( ',', self::processedHooks( $before_counts, $after_counts ) ), + 'job_ids' => implode( ',', $job_ids ), ); } @@ -312,23 +321,60 @@ private static function normalizeHooks( mixed $hooks ): ?array { return empty( $hooks ) ? null : $hooks; } + /** + * Normalize an optional job-id scope. + * + * @param mixed $job_ids Optional comma-separated string or ID list. + * @return int[] Job IDs. + */ + private static function normalizeJobIds( mixed $job_ids ): array { + if ( is_string( $job_ids ) ) { + $job_ids = preg_split( '/\s*,\s*/', $job_ids, -1, PREG_SPLIT_NO_EMPTY ); + } + + if ( ! is_array( $job_ids ) ) { + return array(); + } + + $normalized = array(); + foreach ( $job_ids as $job_id ) { + $job_id = absint( $job_id ); + if ( $job_id > 0 ) { + $normalized[] = $job_id; + } + } + + return array_values( array_unique( $normalized ) ); + } + /** * Build optional hook WHERE SQL for prepared Action Scheduler queries. * * @param string[]|null $hooks Hook scope, or null for all hooks in the group. * @return array{sql:string,values:string[]} SQL fragment and placeholder values. */ - private static function hookWhereSql( ?array $hooks ): array { - if ( empty( $hooks ) ) { - return array( - 'sql' => '', - 'values' => array(), - ); + private static function hookWhereSql( ?array $hooks, array $job_ids = array() ): array { + $values = array(); + $sql = ''; + + if ( ! empty( $hooks ) ) { + $sql .= 'a.hook IN (' . implode( ', ', array_fill( 0, count( $hooks ), '%s' ) ) . ') AND '; + $values = array_merge( $values, $hooks ); + } + + if ( ! empty( $job_ids ) ) { + $clauses = array(); + foreach ( $job_ids as $job_id ) { + $clauses[] = '(a.args LIKE %s OR a.args LIKE %s)'; + $values[] = '%"job_id":' . $job_id . ',%'; + $values[] = '%"job_id":' . $job_id . '}%'; + } + $sql .= '(' . implode( ' OR ', $clauses ) . ') AND '; } return array( - 'sql' => 'a.hook IN (' . implode( ', ', array_fill( 0, count( $hooks ), '%s' ) ) . ') AND ', - 'values' => $hooks, + 'sql' => $sql, + 'values' => $values, ); } @@ -350,8 +396,8 @@ private static function delta( array $before_counts, array $after_counts, string * * @return int Due pending count. */ - private static function getDuePendingCount( ?array $hooks = null ): int { - return self::countActions( true, $hooks ); + private static function getDuePendingCount( ?array $hooks = null, array $job_ids = array() ): int { + return self::countActions( true, $hooks, $job_ids ); } /** @@ -359,8 +405,8 @@ private static function getDuePendingCount( ?array $hooks = null ): int { * * @return int Pending count. */ - private static function getPendingCount( ?array $hooks = null ): int { - return self::countActions( false, $hooks ); + private static function getPendingCount( ?array $hooks = null, array $job_ids = array() ): int { + return self::countActions( false, $hooks, $job_ids ); } /** @@ -369,12 +415,12 @@ private static function getPendingCount( ?array $hooks = null ): int { * @param int $limit Maximum IDs to return. * @return int[] Action IDs. */ - private static function getDuePendingActionIds( int $limit, ?array $hooks = null ): array { + private static function getDuePendingActionIds( int $limit, ?array $hooks = null, array $job_ids = array() ): array { global $wpdb; $actions_table = $wpdb->prefix . 'actionscheduler_actions'; $groups_table = $wpdb->prefix . 'actionscheduler_groups'; - $hook_sql = self::hookWhereSql( $hooks ); + $hook_sql = self::hookWhereSql( $hooks, $job_ids ); $values = array_merge( array( $actions_table, $groups_table ), $hook_sql['values'], @@ -406,12 +452,12 @@ private static function getDuePendingActionIds( int $limit, ?array $hooks = null * @param bool $due_only Whether to count only due actions. * @return int Pending count. */ - private static function countActions( bool $due_only, ?array $hooks = null ): int { + private static function countActions( bool $due_only, ?array $hooks = null, array $job_ids = array() ): int { global $wpdb; $actions_table = $wpdb->prefix . 'actionscheduler_actions'; $groups_table = $wpdb->prefix . 'actionscheduler_groups'; - $hook_sql = self::hookWhereSql( $hooks ); + $hook_sql = self::hookWhereSql( $hooks, $job_ids ); if ( $due_only ) { $values = array_merge( @@ -460,12 +506,12 @@ private static function countActions( bool $due_only, ?array $hooks = null ): in * * @return array> Counts by hook and status. */ - private static function getStatusCounts( ?array $hooks = null ): array { + private static function getStatusCounts( ?array $hooks = null, array $job_ids = array() ): array { global $wpdb; $actions_table = $wpdb->prefix . 'actionscheduler_actions'; $groups_table = $wpdb->prefix . 'actionscheduler_groups'; - $hook_sql = self::hookWhereSql( $hooks ); + $hook_sql = self::hookWhereSql( $hooks, $job_ids ); $values = array_merge( array( $actions_table, $groups_table ), $hook_sql['values'], diff --git a/tests/flow-run-cli-drain-smoke.php b/tests/flow-run-cli-drain-smoke.php index 8210694af..507786cdf 100644 --- a/tests/flow-run-cli-drain-smoke.php +++ b/tests/flow-run-cli-drain-smoke.php @@ -39,7 +39,10 @@ function assert_drain_contains( string $needle, string $haystack, string $messag assert_drain_contains( "WP_CLI::add_command( 'datamachine drain'", $boot_src, 'first-class datamachine drain command is registered' ); assert_drain_contains( "datamachine_pipeline_batch_chunk'", $drain_src, 'drain includes pipeline batch chunk hook' ); assert_drain_contains( "datamachine_execute_step'", $drain_src, 'drain includes execute step hook' ); -assert_drain_contains( 'hookWhereSql( $hooks )', $drain_src, 'drain supports an optional hook scope' ); +assert_drain_contains( '[--job-id=]', $drain_src, 'drain documents optional job-id scope' ); +assert_drain_contains( 'normalizeJobIds', $drain_src, 'drain normalizes optional job-id scope' ); +assert_drain_contains( 'hookWhereSql( $hooks, $job_ids )', $drain_src, 'drain supports optional hook and job-id scopes' ); +assert_drain_contains( 'a.args LIKE %s', $drain_src, 'drain can filter pending actions by serialized job_id args' ); assert_drain_contains( "a.status = \\'pending\\'", $drain_src, 'drain queries pending actions in the Data Machine group' ); assert_drain_contains( 'getDuePendingActionIds', $drain_src, 'drain queries concrete due Data Machine action IDs' ); assert_drain_contains( "\\ActionScheduler::runner()", $drain_src, 'drain runs concrete action IDs through Action Scheduler runner' );