Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 82 additions & 36 deletions inc/Cli/Commands/DrainCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class DrainCommand extends BaseCommand {
* default: 0
* ---
*
* [--job-id=<ids>]
* : Optional comma-separated Data Machine job IDs to drain. Useful when
* unrelated due work is blocked ahead of a known cleanup or retry run.
*
* [--format=<format>]
* : Output format.
* ---
Expand All @@ -71,6 +75,7 @@ public function __invoke( array $args, array $assoc_args ): void {
'limit' => isset( $assoc_args['limit'] ) ? (int) $assoc_args['limit'] : 0,
'batch_size' => isset( $assoc_args['batch-size'] ) ? (int) $assoc_args['batch-size'] : 25,
'time_limit' => isset( $assoc_args['time-limit'] ) ? (int) $assoc_args['time-limit'] : 0,
'job_ids' => isset( $assoc_args['job-id'] ) ? (string) $assoc_args['job-id'] : '',
)
);

Expand All @@ -89,22 +94,23 @@ public function __invoke( array $args, array $assoc_args ): void {
* so this drain runs concrete due action IDs from that same group instead of
* a hand-maintained hook allow-list.
*
* @param array{limit?:int,batch_size?:int,time_limit?:int,hooks?:string[]} $options Drain options.
* @param array{limit?:int,batch_size?:int,time_limit?:int,hooks?:string[],job_ids?:string|int[]} $options Drain options.
* @return array<string,int|string> Drain stats.
*/
public static function drain( array $options = array() ): array {
$limit = max( 0, (int) ( $options['limit'] ?? 0 ) );
$batch_size = max( 1, (int) ( $options['batch_size'] ?? 25 ) );
$time_limit = max( 0, (int) ( $options['time_limit'] ?? 0 ) );
$hooks = self::normalizeHooks( $options['hooks'] ?? null );
$job_ids = self::normalizeJobIds( $options['job_ids'] ?? null );

$started_at = time();
$before_counts = self::getStatusCounts( $hooks );
$before_counts = self::getStatusCounts( $hooks, $job_ids );
$batches = 0;
$warnings = 0;

while ( self::getDuePendingCount( $hooks ) > 0 ) {
$stats = self::buildStats( $before_counts, self::getStatusCounts( $hooks ), $batches, $warnings, $hooks );
while ( self::getDuePendingCount( $hooks, $job_ids ) > 0 ) {
$stats = self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids );
if ( $limit > 0 && (int) $stats['actions_processed'] >= $limit ) {
break;
}
Expand All @@ -121,13 +127,13 @@ public static function drain( array $options = array() ): array {
break;
}

$action_ids = self::getDuePendingActionIds( $current_batch_size, $hooks );
$action_ids = self::getDuePendingActionIds( $current_batch_size, $hooks, $job_ids );
if ( empty( $action_ids ) ) {
break;
}

$due_before = self::getDuePendingCount( $hooks );
$status_before = self::getStatusCounts( $hooks );
$due_before = self::getDuePendingCount( $hooks, $job_ids );
$status_before = self::getStatusCounts( $hooks, $job_ids );
$result = self::runActionSchedulerActions( $action_ids );
++$batches;

Expand All @@ -138,31 +144,33 @@ public static function drain( array $options = array() ): array {
break;
}

$status_after = self::getStatusCounts( $hooks );
$status_after = self::getStatusCounts( $hooks, $job_ids );
$progress = self::processedDelta( $status_before, $status_after );
if ( 0 === $progress && self::getDuePendingCount( $hooks ) >= $due_before ) {
if ( 0 === $progress && self::getDuePendingCount( $hooks, $job_ids ) >= $due_before ) {
++$warnings;
WP_CLI::warning( 'Drain stopped because Action Scheduler made no observable progress.' );
break;
}
}

return self::buildStats( $before_counts, self::getStatusCounts( $hooks ), $batches, $warnings, $hooks );
return self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids );
}

/**
* Return a read-only Data Machine Action Scheduler status snapshot.
*
* @param array{hooks?:string[]} $options Status options.
* @param array{hooks?:string[],job_ids?:string|int[]} $options Status options.
* @return array<string,int|string> Status stats.
*/
public static function status( array $options = array() ): array {
$hooks = self::normalizeHooks( $options['hooks'] ?? null );
$hooks = self::normalizeHooks( $options['hooks'] ?? null );
$job_ids = self::normalizeJobIds( $options['job_ids'] ?? null );

return array(
'due_pending' => self::getDuePendingCount( $hooks ),
'total_pending' => self::getPendingCount( $hooks ),
'hooks' => implode( ',', array_keys( self::getStatusCounts( $hooks ) ) ),
'due_pending' => self::getDuePendingCount( $hooks, $job_ids ),
'total_pending' => self::getPendingCount( $hooks, $job_ids ),
'hooks' => implode( ',', array_keys( self::getStatusCounts( $hooks, $job_ids ) ) ),
'job_ids' => implode( ',', $job_ids ),
);
}

Expand Down Expand Up @@ -205,7 +213,7 @@ private static function runActionSchedulerActions( array $action_ids ): object {
* @param int $warnings Warning count.
* @return array<string,int|string> Stats.
*/
private static function buildStats( array $before_counts, array $after_counts, int $batches, int $warnings, ?array $hooks = null ): array {
private static function buildStats( array $before_counts, array $after_counts, int $batches, int $warnings, ?array $hooks = null, array $job_ids = array() ): array {
$batch_completed = self::delta( $before_counts, $after_counts, self::HOOK_BATCH_CHUNK, 'complete' );
$batch_failed = self::delta( $before_counts, $after_counts, self::HOOK_BATCH_CHUNK, 'failed' );
$step_completed = self::delta( $before_counts, $after_counts, self::HOOK_EXECUTE_STEP, 'complete' );
Expand All @@ -227,10 +235,11 @@ private static function buildStats( array $before_counts, array $after_counts, i
'step_execution_failures' => $step_failed,
'actions_processed' => $total_processed,
'other_actions' => max( 0, $total_processed - $tracked_processed ),
'remaining_pending' => self::getDuePendingCount( $hooks ),
'total_pending' => self::getPendingCount( $hooks ),
'remaining_pending' => self::getDuePendingCount( $hooks, $job_ids ),
'total_pending' => self::getPendingCount( $hooks, $job_ids ),
'warnings' => $warnings,
'hooks' => implode( ',', self::processedHooks( $before_counts, $after_counts ) ),
'job_ids' => implode( ',', $job_ids ),
);
}

Expand Down Expand Up @@ -312,23 +321,60 @@ private static function normalizeHooks( mixed $hooks ): ?array {
return empty( $hooks ) ? null : $hooks;
}

/**
* Normalize an optional job-id scope.
*
* @param mixed $job_ids Optional comma-separated string or ID list.
* @return int[] Job IDs.
*/
private static function normalizeJobIds( mixed $job_ids ): array {
if ( is_string( $job_ids ) ) {
$job_ids = preg_split( '/\s*,\s*/', $job_ids, -1, PREG_SPLIT_NO_EMPTY );
}

if ( ! is_array( $job_ids ) ) {
return array();
}

$normalized = array();
foreach ( $job_ids as $job_id ) {
$job_id = absint( $job_id );
if ( $job_id > 0 ) {
$normalized[] = $job_id;
}
}

return array_values( array_unique( $normalized ) );
}

/**
* Build optional hook WHERE SQL for prepared Action Scheduler queries.
*
* @param string[]|null $hooks Hook scope, or null for all hooks in the group.
* @return array{sql:string,values:string[]} SQL fragment and placeholder values.
*/
private static function hookWhereSql( ?array $hooks ): array {
if ( empty( $hooks ) ) {
return array(
'sql' => '',
'values' => array(),
);
private static function hookWhereSql( ?array $hooks, array $job_ids = array() ): array {
$values = array();
$sql = '';

if ( ! empty( $hooks ) ) {
$sql .= 'a.hook IN (' . implode( ', ', array_fill( 0, count( $hooks ), '%s' ) ) . ') AND ';
$values = array_merge( $values, $hooks );
}

if ( ! empty( $job_ids ) ) {
$clauses = array();
foreach ( $job_ids as $job_id ) {
$clauses[] = '(a.args LIKE %s OR a.args LIKE %s)';
$values[] = '%"job_id":' . $job_id . ',%';
$values[] = '%"job_id":' . $job_id . '}%';
}
$sql .= '(' . implode( ' OR ', $clauses ) . ') AND ';
}

return array(
'sql' => 'a.hook IN (' . implode( ', ', array_fill( 0, count( $hooks ), '%s' ) ) . ') AND ',
'values' => $hooks,
'sql' => $sql,
'values' => $values,
);
}

Expand All @@ -350,17 +396,17 @@ private static function delta( array $before_counts, array $after_counts, string
*
* @return int Due pending count.
*/
private static function getDuePendingCount( ?array $hooks = null ): int {
return self::countActions( true, $hooks );
private static function getDuePendingCount( ?array $hooks = null, array $job_ids = array() ): int {
return self::countActions( true, $hooks, $job_ids );
}

/**
* Count all pending Data Machine actions.
*
* @return int Pending count.
*/
private static function getPendingCount( ?array $hooks = null ): int {
return self::countActions( false, $hooks );
private static function getPendingCount( ?array $hooks = null, array $job_ids = array() ): int {
return self::countActions( false, $hooks, $job_ids );
}

/**
Expand All @@ -369,12 +415,12 @@ private static function getPendingCount( ?array $hooks = null ): int {
* @param int $limit Maximum IDs to return.
* @return int[] Action IDs.
*/
private static function getDuePendingActionIds( int $limit, ?array $hooks = null ): array {
private static function getDuePendingActionIds( int $limit, ?array $hooks = null, array $job_ids = array() ): array {
global $wpdb;

$actions_table = $wpdb->prefix . 'actionscheduler_actions';
$groups_table = $wpdb->prefix . 'actionscheduler_groups';
$hook_sql = self::hookWhereSql( $hooks );
$hook_sql = self::hookWhereSql( $hooks, $job_ids );
$values = array_merge(
array( $actions_table, $groups_table ),
$hook_sql['values'],
Expand Down Expand Up @@ -406,12 +452,12 @@ private static function getDuePendingActionIds( int $limit, ?array $hooks = null
* @param bool $due_only Whether to count only due actions.
* @return int Pending count.
*/
private static function countActions( bool $due_only, ?array $hooks = null ): int {
private static function countActions( bool $due_only, ?array $hooks = null, array $job_ids = array() ): int {
global $wpdb;

$actions_table = $wpdb->prefix . 'actionscheduler_actions';
$groups_table = $wpdb->prefix . 'actionscheduler_groups';
$hook_sql = self::hookWhereSql( $hooks );
$hook_sql = self::hookWhereSql( $hooks, $job_ids );

if ( $due_only ) {
$values = array_merge(
Expand Down Expand Up @@ -460,12 +506,12 @@ private static function countActions( bool $due_only, ?array $hooks = null ): in
*
* @return array<string,array<string,int>> Counts by hook and status.
*/
private static function getStatusCounts( ?array $hooks = null ): array {
private static function getStatusCounts( ?array $hooks = null, array $job_ids = array() ): array {
global $wpdb;

$actions_table = $wpdb->prefix . 'actionscheduler_actions';
$groups_table = $wpdb->prefix . 'actionscheduler_groups';
$hook_sql = self::hookWhereSql( $hooks );
$hook_sql = self::hookWhereSql( $hooks, $job_ids );
$values = array_merge(
array( $actions_table, $groups_table ),
$hook_sql['values'],
Expand Down
5 changes: 4 additions & 1 deletion tests/flow-run-cli-drain-smoke.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ function assert_drain_contains( string $needle, string $haystack, string $messag
assert_drain_contains( "WP_CLI::add_command( 'datamachine drain'", $boot_src, 'first-class datamachine drain command is registered' );
assert_drain_contains( "datamachine_pipeline_batch_chunk'", $drain_src, 'drain includes pipeline batch chunk hook' );
assert_drain_contains( "datamachine_execute_step'", $drain_src, 'drain includes execute step hook' );
assert_drain_contains( 'hookWhereSql( $hooks )', $drain_src, 'drain supports an optional hook scope' );
assert_drain_contains( '[--job-id=<ids>]', $drain_src, 'drain documents optional job-id scope' );
assert_drain_contains( 'normalizeJobIds', $drain_src, 'drain normalizes optional job-id scope' );
assert_drain_contains( 'hookWhereSql( $hooks, $job_ids )', $drain_src, 'drain supports optional hook and job-id scopes' );
assert_drain_contains( 'a.args LIKE %s', $drain_src, 'drain can filter pending actions by serialized job_id args' );
assert_drain_contains( "a.status = \\'pending\\'", $drain_src, 'drain queries pending actions in the Data Machine group' );
assert_drain_contains( 'getDuePendingActionIds', $drain_src, 'drain queries concrete due Data Machine action IDs' );
assert_drain_contains( "\\ActionScheduler::runner()", $drain_src, 'drain runs concrete action IDs through Action Scheduler runner' );
Expand Down
Loading