Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions src/async-queue-closure-job/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

The async queue closure job component for Hyperf. Execute closures as background jobs with full support for dependency injection, fluent configuration.

> **Note**: Starting from v3.1.73, this package serves as a convenience wrapper around the `friendsofhyperf/support` package. The core implementation (`CallQueuedClosure` and related classes) has been moved to the support package to eliminate circular dependencies and improve package architecture.

## Installation

```shell
Expand Down Expand Up @@ -109,14 +111,39 @@ The main dispatch function that creates a closure job.
- Closures with captured variables (`use`)
- Closures with nullable parameters

## Testing
## Package Architecture

Run tests:
As of v3.1.73, this package has been refactored to improve the overall architecture:

```shell
composer test:unit -- tests/AsyncQueueClosureJob
- **Core Implementation**: Moved to `friendsofhyperf/support` package
- `FriendsOfHyperf\Support\CallQueuedClosure`
- `FriendsOfHyperf\Support\Traits\ClosureParameterInjection`
- **Convenience Layer**: This package now provides a namespace alias for easy migration
- `FriendsOfHyperf\AsyncQueueClosureJob\dispatch()` → delegates to support package

### Why This Change?

1. **Eliminates Circular Dependencies**: The support package previously depended on this package, creating a circular dependency
2. **Single Source of Truth**: Core functionality now lives in one place
3. **Simplified Dependency Tree**: Reduces maintenance overhead
4. **Non-Breaking**: Existing code continues to work without changes

### Migration Guide

**No action required!** The namespace `FriendsOfHyperf\AsyncQueueClosureJob\dispatch()` continues to work as before. However, if you want to use the new location directly:

```php
// Old (still supported)
use function FriendsOfHyperf\AsyncQueueClosureJob\dispatch;

// New (recommended for new code)
use function FriendsOfHyperf\Support\dispatch;
```

## Testing

Tests for the core functionality are now maintained in the support package.

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.
Expand Down
7 changes: 1 addition & 6 deletions src/async-queue-closure-job/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
"pull-request": "https://github.com/friendsofhyperf/components/pulls"
},
"require": {
"hyperf/async-queue": "~3.1.0",
"hyperf/conditionable": "~3.1.0",
"hyperf/context": "~3.1.0",
"hyperf/di": "~3.1.0",
"hyperf/support": "~3.1.72",
"laravel/serializable-closure": "^1.0 || ^2.0"
"hyperf/support": "~3.1.73"
},
"minimum-stability": "dev",
"autoload": {
Expand Down
11 changes: 1 addition & 10 deletions src/async-queue-closure-job/src/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@ final class ConfigProvider
{
public function __invoke()
{
return [
'dependencies' => [],
'annotations' => [
'scan' => [
'paths' => [
__DIR__,
],
],
],
];
return [];
}
}
9 changes: 6 additions & 3 deletions src/async-queue-closure-job/src/Functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@

use Closure;
use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch;
use FriendsOfHyperf\Support\CallQueuedClosure;

use function FriendsOfHyperf\Support\dispatch as base_dispatch;

/**
* Dispatch a closure as an async queue job.
*
* @deprecated since version 3.1.73, will be removed in version 3.2, use `FriendsOfHyperf\Support\dispatch` instead.
*
* @param Closure $closure The closure to execute
* @param-closure-this CallQueuedClosure $closure
*/
function dispatch(Closure $closure): PendingAsyncQueueDispatch
{
return new PendingAsyncQueueDispatch(
CallQueuedClosure::create($closure)
);
return base_dispatch($closure);
}
2 changes: 1 addition & 1 deletion src/helpers/src/Functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
use Countable;
use DateTimeZone;
use Exception;
use FriendsOfHyperf\AsyncQueueClosureJob\CallQueuedClosure;
use FriendsOfHyperf\AsyncTask\Task as AsyncTask;
use FriendsOfHyperf\AsyncTask\TaskInterface as AsyncTaskInterface;
use FriendsOfHyperf\Support\CallQueuedClosure;
use FriendsOfHyperf\Support\Environment;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Amqp\Producer;
Expand Down
206 changes: 203 additions & 3 deletions src/support/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,216 @@
[![Total Downloads](https://img.shields.io/packagist/dt/friendsofhyperf/support.svg?style=flat-square)](https://packagist.org/packages/friendsofhyperf/support)
[![GitHub license](https://img.shields.io/github/license/friendsofhyperf/support)](https://github.com/friendsofhyperf/support)

Another support component for Hyperf.
A comprehensive support component for Hyperf providing essential utilities, helpers, and base classes.

## Installation
## Features

- 🎯 **Fluent Dispatch API** - Elegant job dispatch with support for async queue, AMQP, and Kafka
- 🔄 **Closure Jobs** - Execute closures as background jobs with dependency injection
- 🛠️ **Helper Functions** - Collection of useful helper functions
- 📦 **Bus System** - Pending dispatch classes for various message systems
- 🧩 **Traits & Utilities** - Reusable traits and utility classes

- Installation
## Installation

```shell
composer require friendsofhyperf/support
```

## Usage

### Dispatch Helper Function

The `dispatch()` helper function provides a fluent API for dispatching jobs to different systems:

#### Async Queue (Closure Jobs)

```php
use function FriendsOfHyperf\Support\dispatch;

// Simple closure dispatch to async queue
dispatch(function () {
// Your job logic here
logger()->info('Job executed!');
});

// With configuration
dispatch(function () {
// Your job logic here
})
->onConnection('high-priority')
->delay(60) // Execute after 60 seconds
->setMaxAttempts(5);

// With dependency injection
dispatch(function (UserService $userService, LoggerInterface $logger) {
$users = $userService->getActiveUsers();
$logger->info('Processing ' . count($users) . ' users');
});
```
Comment on lines +29 to +53
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

文档与实现不一致:方法名错误。

示例代码中使用了 onConnection() 方法(第 44 行),但根据测试文件 tests/Support/DispatchTest.php 的实际实现,PendingAsyncQueueDispatch 使用的是 onPool() 方法,而非 onConnection()

测试文件中的证据:

  • 第 113 行:$pending->onPool('custom-pool')
  • 第 154 行:->onPool('high-priority')
  • 第 192 行:->onPool('test-pool')

应用此修改以纠正方法名:

 dispatch(function () {
     // Your job logic here
 })
-    ->onConnection('high-priority')
+    ->onPool('high-priority')
     ->delay(60) // Execute after 60 seconds
     ->setMaxAttempts(5);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#### Async Queue (Closure Jobs)
```php
use function FriendsOfHyperf\Support\dispatch;
// Simple closure dispatch to async queue
dispatch(function () {
// Your job logic here
logger()->info('Job executed!');
});
// With configuration
dispatch(function () {
// Your job logic here
})
->onConnection('high-priority')
->delay(60) // Execute after 60 seconds
->setMaxAttempts(5);
// With dependency injection
dispatch(function (UserService $userService, LoggerInterface $logger) {
$users = $userService->getActiveUsers();
$logger->info('Processing ' . count($users) . ' users');
});
```
#### Async Queue (Closure Jobs)
🤖 Prompt for AI Agents
In src/support/README.md around lines 29 to 53, the documentation uses the wrong
method name: it calls onConnection() at line 44 while the implementation/tests
expect onPool(). Update the README examples to use ->onPool('...') wherever
onConnection() appears (including the high-priority example) so the docs match
PendingAsyncQueueDispatch's API and the tests.


#### AMQP Producer Messages

```php
use Hyperf\Amqp\Message\ProducerMessageInterface;
use function FriendsOfHyperf\Support\dispatch;

// Dispatch AMQP message
dispatch($amqpMessage)
->setPool('default')
->setExchange('my.exchange')
->setRoutingKey('my.routing.key')
->setTimeout(10)
->setConfirm(true);
```

#### Kafka Producer Messages

```php
use Hyperf\Kafka\Producer\ProduceMessage;
use function FriendsOfHyperf\Support\dispatch;

// Dispatch Kafka message
dispatch($kafkaMessage)
->setPool('default');
```

### CallQueuedClosure

The `CallQueuedClosure` class allows you to execute closures as async queue jobs:

```php
use FriendsOfHyperf\Support\CallQueuedClosure;

// Create a closure job
$job = CallQueuedClosure::create(function () {
// Your job logic
return 'Job completed!';
});

// Configure max attempts
$job->setMaxAttempts(3);

// The job can be pushed to queue manually or via dispatch()
```

### Pending Dispatch Classes

#### PendingAsyncQueueDispatch

Fluent API for async queue job dispatch:

```php
use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch;

$pending = new PendingAsyncQueueDispatch($job);
$pending
->onConnection('default')
->delay(30)
->when($condition, function ($dispatch) {
$dispatch->onConnection('special');
})
->unless($otherCondition, function ($dispatch) {
$dispatch->delay(60);
});
// Job is dispatched when object is destroyed
```
Comment on lines +102 to +120
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

文档与实现不一致:方法名错误。

示例中使用了 onConnection() 方法(第 111、114 行),但实际实现使用的是 onPool() 方法。

应用此修改以纠正方法名:

 $pending = new PendingAsyncQueueDispatch($job);
 $pending
-    ->onConnection('default')
+    ->onPool('default')
     ->delay(30)
     ->when($condition, function ($dispatch) {
-        $dispatch->onConnection('special');
+        $dispatch->onPool('special');
     })
     ->unless($otherCondition, function ($dispatch) {
         $dispatch->delay(60);
     });
🤖 Prompt for AI Agents
In src/support/README.md around lines 102 to 120 the example uses the
non-existent method onConnection (lines ~111 and ~114) while the implementation
exposes onPool; update the documentation example to call onPool instead of
onConnection (replace both occurrences) so the README matches the actual API,
and verify any surrounding text references the connection naming to reflect
"pool" terminology.


#### PendingAmqpProducerMessageDispatch

Fluent API for AMQP message dispatch:

```php
use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch;

$pending = new PendingAmqpProducerMessageDispatch($message);
$pending
->setPool('default')
->setExchange('my.exchange')
->setRoutingKey('my.routing.key')
->setTimeout(5)
->setConfirm(true);
// Message is sent when object is destroyed
```

#### PendingKafkaProducerMessageDispatch

Fluent API for Kafka message dispatch:

```php
use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch;

$pending = new PendingKafkaProducerMessageDispatch($message);
$pending->setPool('default');
// Message is sent when object is destroyed
```

### Conditional Execution

All pending dispatch classes support conditional execution:

```php
use function FriendsOfHyperf\Support\dispatch;

dispatch($job)
->when($shouldUseHighPriority, function ($dispatch) {
$dispatch->onConnection('high-priority');
})
->unless($isTestMode, function ($dispatch) {
$dispatch->delay(10);
});
```
Comment on lines +151 to +165
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

文档与实现不一致:方法名错误。

条件执行示例中使用了 onConnection() 方法(第 160 行),但应为 onPool()

应用此修改:

 dispatch($job)
     ->when($shouldUseHighPriority, function ($dispatch) {
-        $dispatch->onConnection('high-priority');
+        $dispatch->onPool('high-priority');
     })
     ->unless($isTestMode, function ($dispatch) {
         $dispatch->delay(10);
     });
🤖 Prompt for AI Agents
In src/support/README.md around lines 151 to 165, the conditional execution
example uses the wrong method name: it calls onConnection() (line ~160) but the
implementation expects onPool(); update the example to call onPool() instead and
ensure the surrounding text/example remains consistent with the library API.


## API Reference

### dispatch($job)

Creates a pending dispatch instance based on the job type:

- `Closure``PendingAsyncQueueDispatch` with `CallQueuedClosure`
- `ProducerMessageInterface``PendingAmqpProducerMessageDispatch`
- `ProduceMessage``PendingKafkaProducerMessageDispatch`
- Other objects → `PendingAsyncQueueDispatch`

### PendingAsyncQueueDispatch Methods

- `onConnection(string $connection): static` - Set queue connection
- `delay(int $delay): static` - Delay job execution (seconds)
- `setMaxAttempts(int $attempts): static` - Set max retry attempts
- `when(mixed $condition, callable $callback): static` - Conditional execution
- `unless(mixed $condition, callable $callback): static` - Inverse conditional execution

Comment on lines +178 to +185
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

API 文档错误:方法名不匹配。

第 180 行将方法文档化为 onConnection(string $connection),但根据测试文件的实际实现,该方法应为 onPool(string $pool)

应用此修改:

 ### PendingAsyncQueueDispatch Methods
 
-- `onConnection(string $connection): static` - Set queue connection
+- `onPool(string $pool): static` - Set queue pool
 - `delay(int $delay): static` - Delay job execution (seconds)
 - `setMaxAttempts(int $attempts): static` - Set max retry attempts
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
### PendingAsyncQueueDispatch Methods
- `onConnection(string $connection): static` - Set queue connection
- `delay(int $delay): static` - Delay job execution (seconds)
- `setMaxAttempts(int $attempts): static` - Set max retry attempts
- `when(mixed $condition, callable $callback): static` - Conditional execution
- `unless(mixed $condition, callable $callback): static` - Inverse conditional execution
### PendingAsyncQueueDispatch Methods
- `onPool(string $pool): static` - Set queue pool
- `delay(int $delay): static` - Delay job execution (seconds)
- `setMaxAttempts(int $attempts): static` - Set max retry attempts
- `when(mixed $condition, callable $callback): static` - Conditional execution
- `unless(mixed $condition, callable $callback): static` - Inverse conditional execution
🤖 Prompt for AI Agents
In src/support/README.md around lines 178 to 185, the API docs incorrectly list
onConnection(string $connection): static; update the doc to match the
implementation and tests by renaming the method to onPool(string $pool): static
and adjust the parameter name and description accordingly (e.g., "Set queue
pool") so the README reflects the real method signature and behavior.

### PendingAmqpProducerMessageDispatch Methods

- `setPool(string $pool): static` - Set AMQP pool name
- `setExchange(string $exchange): static` - Set exchange name
- `setRoutingKey(array|string $routingKey): static` - Set routing key(s)
- `setTimeout(int $timeout): static` - Set timeout (seconds)
- `setConfirm(bool $confirm): static` - Enable/disable confirm mode
- `when(mixed $condition, callable $callback): static` - Conditional execution
- `unless(mixed $condition, callable $callback): static` - Inverse conditional execution

Comment on lines +186 to +195
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

验证 AMQP 方法的可用性。

文档记录了 setExchange()setRoutingKey() 方法(第 189-190 行),这些是 PR 中新增的增强功能。但是,第 188 行文档化为 setPool(),而测试文件显示实际使用的是 onPool() 方法(tests/Support/DispatchTest.php 第 227 行)。

请确认:

  1. setExchange()setRoutingKey() 方法确实已在 PendingAmqpProducerMessageDispatch 中实现
  2. 池配置方法应为 onPool() 还是 setPool()

运行以下脚本验证方法签名:


🏁 Script executed:

#!/bin/bash
# 描述:验证 PendingAmqpProducerMessageDispatch 类的方法签名

# 查找类定义及其所有公共方法
ast-grep --pattern $'class PendingAmqpProducerMessageDispatch {
  $$$
}'

# 搜索 setExchange 方法
rg -nP --type=php -C3 'function\s+setExchange\s*\('

# 搜索 setRoutingKey 方法  
rg -nP --type=php -C3 'function\s+setRoutingKey\s*\('

# 搜索 setPool 或 onPool 方法
rg -nP --type=php -C3 'function\s+(setPool|onPool)\s*\(' -g '*PendingAmqpProducerMessageDispatch.php'

Length of output: 8260


更新 README 文档中的池配置方法名称。

验证确认 setExchange()setRoutingKey() 方法已正确实现,但第 188 行的文档存在错误。实现中的方法名为 onPool()(位于 src/support/src/Bus/PendingAmqpProducerMessageDispatch.php 第 50 行),而不是 setPool()

需要修正:

  • 第 188 行:setPool(string $pool): staticonPool(string $pool): static
🤖 Prompt for AI Agents
In src/support/README.md around lines 186 to 195, the documented pool
configuration method name is incorrect: update line 188 to replace
"setPool(string $pool): static" with "onPool(string $pool): static"; also verify
the implementation signature in
src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (around line 50)
matches onPool(string $pool): static and adjust the README only if the
implementation is correct.

### PendingKafkaProducerMessageDispatch Methods

- `setPool(string $pool): static` - Set Kafka pool name
- `when(mixed $condition, callable $callback): static` - Conditional execution
- `unless(mixed $condition, callable $callback): static` - Inverse conditional execution

### CallQueuedClosure

- `create(Closure $closure): static` - Create a new closure job
- `setMaxAttempts(int $attempts): void` - Set max retry attempts
- `handle(): mixed` - Execute the closure (called by queue worker)

## Architecture Notes

As of v3.1.73, this package includes the core async queue closure functionality:

- Previously in `friendsofhyperf/async-queue-closure-job`
- Moved here to eliminate circular dependencies
- The `async-queue-closure-job` package now depends on this package
- All functionality remains backward compatible

## Contact

- [Twitter](https://twitter.com/huangdijia)
Expand Down
2 changes: 1 addition & 1 deletion src/support/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"pull-request": "https://github.com/friendsofhyperf/components/pulls"
},
"require": {
"friendsofhyperf/async-queue-closure-job": "~3.1.0",
"hyperf/collection": "~3.1.0",
"hyperf/context": "~3.1.0",
"hyperf/di": "~3.1.0",
Expand All @@ -29,6 +28,7 @@
"hyperf/stringable": "~3.1.0",
"hyperf/support": "~3.1.0",
"hyperf/tappable": "~3.1.0",
"laravel/serializable-closure": "^1.0 || ^2.0",
"nesbot/carbon": "^2.0 || ^3.0",
"symfony/polyfill-php85": "^1.33"
},
Expand Down
3 changes: 2 additions & 1 deletion src/support/src/AsyncQueue/ClosureJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
namespace FriendsOfHyperf\Support\AsyncQueue;

use Closure;
use FriendsOfHyperf\Support\CallQueuedClosure;
use Laravel\SerializableClosure\SerializableClosure;

/**
* @deprecated since v3.2, will be removed in v3.2, use `FriendsOfHyperf\AsyncQueueClosureJob\ClosureJob` instead.
*/
class ClosureJob extends \FriendsOfHyperf\AsyncQueueClosureJob\CallQueuedClosure
class ClosureJob extends CallQueuedClosure
{
public function __construct(Closure $closure, int $maxAttempts = 0)
{
Expand Down
Loading