From 0e08cbc5e61054bed83b2c64dda003e5cc3a34a3 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 19 Nov 2025 15:58:26 +0800 Subject: [PATCH 01/10] refactor: improve server mutex implementation and configuration - Change server mutex prefix from APP_ENV to APP_NAME for better identification - Remove readonly properties from Consumer class to allow dependency injection flexibility - Improve RedisServerMutex constructor with better name construction and default owner handling - Add getName() method to ServerMutexInterface for consistency - Create ServerMutexCommand for mutex management operations This refactor improves the server mutex functionality by: - Using application name instead of environment for mutex prefix - Making Consumer properties mutable for better DI container compatibility - Enhancing RedisServerMutex with proper name formatting and owner handling - Providing interface consistency with getName() method --- src/trigger/publish/trigger.php | 2 +- .../src/Command/ServerMutexCommand.php | 25 ++++++++++++ src/trigger/src/Consumer.php | 40 +++++++++++-------- src/trigger/src/Mutex/RedisServerMutex.php | 22 ++++++---- .../src/Mutex/ServerMutexInterface.php | 2 + 5 files changed, 65 insertions(+), 26 deletions(-) create mode 100644 src/trigger/src/Command/ServerMutexCommand.php diff --git a/src/trigger/publish/trigger.php b/src/trigger/publish/trigger.php index 832855ef1..656686d64 100644 --- a/src/trigger/publish/trigger.php +++ b/src/trigger/publish/trigger.php @@ -31,7 +31,7 @@ 'server_mutex' => [ 'enable' => true, - 'prefix' => env('APP_ENV', 'dev') . '_', + 'prefix' => env('APP_NAME', 'trigger') . ':', 'expires' => 30, 'keepalive_interval' => 10, 'retry_interval' => 10, diff --git a/src/trigger/src/Command/ServerMutexCommand.php b/src/trigger/src/Command/ServerMutexCommand.php new file mode 100644 index 000000000..f13af9f28 --- /dev/null +++ b/src/trigger/src/Command/ServerMutexCommand.php @@ -0,0 +1,25 @@ +name = $options['name'] ?? sprintf('trigger.%s', $this->connection); $this->identifier = $options['identifier'] ?? sprintf('trigger.%s', $this->connection); $this->config = new Config($options); $this->binLogCurrentSnapshot = make(BinLogCurrentSnapshotInterface::class, ['consumer' => $this]); - $this->healthMonitor = $this->config->get('health_monitor.enable', true) ? make(HealthMonitor::class, ['consumer' => $this]) : null; - $this->serverMutex = $this->config->get('server_mutex.enable', true) ? make(ServerMutexInterface::class, [ - 'name' => 'trigger:mutex:' . $this->connection, - 'owner' => Util::getInternalIp(), - 'options' => $this->config->get('server_mutex', []) + ['connection' => $this->connection], - 'logger' => $this->logger, - ]) : null; + + if ($this->config->get('health_monitor.enable', true)) { + $this->healthMonitor = make(HealthMonitor::class, ['consumer' => $this]); + } + + if ($this->config->get('server_mutex.enable', true)) { + $this->serverMutex = make(ServerMutexInterface::class, [ + 'connection' => $this->connection, + 'owner' => Util::getInternalIp(), + 'options' => (array) $this->config->get('server_mutex', []), + 'logger' => $this->logger, + ]); + } } public function start(): void diff --git a/src/trigger/src/Mutex/RedisServerMutex.php b/src/trigger/src/Mutex/RedisServerMutex.php index adaf19f90..57fba1b99 100644 --- a/src/trigger/src/Mutex/RedisServerMutex.php +++ b/src/trigger/src/Mutex/RedisServerMutex.php @@ -31,24 +31,30 @@ class RedisServerMutex implements ServerMutexInterface protected int $retryInterval = 10; - protected string $connection = 'default'; + protected string $name = ''; public function __construct( protected Redis $redis, - protected ?string $name = null, + protected ?string $connection = null, protected ?string $owner = null, array $options = [], protected ?LoggerInterface $logger = null ) { + $this->name = sprintf( + '%s%s', + $options['prefix'] ?? 'trigger:mutex:', + $this->connection + ); + $this->owner = $owner ?? Util::getInternalIp(); $this->expires = (int) ($options['expires'] ?? 60); $this->keepaliveInterval = (int) ($options['keepalive_interval'] ?? 10); - $this->name = ($options['prefix'] ?? '') . ($name ?? sprintf('trigger:server:%s', $this->connection)); - $this->owner = $owner ?? Util::getInternalIp(); - if (isset($options['connection'])) { - $this->connection = $options['connection']; - } - $this->timer = new Timer($this->logger); $this->retryInterval = (int) ($options['retry_interval'] ?? 10); + $this->timer = new Timer($this->logger); + } + + public function getName(): string + { + return $this->name; } public function attempt(?callable $callback = null): void diff --git a/src/trigger/src/Mutex/ServerMutexInterface.php b/src/trigger/src/Mutex/ServerMutexInterface.php index 208b2e96d..94dad2d46 100644 --- a/src/trigger/src/Mutex/ServerMutexInterface.php +++ b/src/trigger/src/Mutex/ServerMutexInterface.php @@ -13,6 +13,8 @@ interface ServerMutexInterface { + public function getName(): string; + public function attempt(?callable $callback = null); public function release(bool $force = false); From 89e29bdcf409f087ea2c97c8abc23491fde10fdc Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:05:24 +0800 Subject: [PATCH 02/10] refactor: enhance ServerMutexCommand for improved action handling and configuration --- .../src/Command/ServerMutexCommand.php | 57 +++++++++++++++++-- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/src/trigger/src/Command/ServerMutexCommand.php b/src/trigger/src/Command/ServerMutexCommand.php index f13af9f28..bd6c05b4a 100644 --- a/src/trigger/src/Command/ServerMutexCommand.php +++ b/src/trigger/src/Command/ServerMutexCommand.php @@ -11,15 +11,64 @@ namespace FriendsOfHyperf\Trigger\Command; -use Hyperf\Command\Command; +use FriendsOfHyperf\Trigger\Mutex\ServerMutexInterface; +use Hyperf\Contract\ConfigInterface; +use Hyperf\Redis\Redis; +use Psr\Container\ContainerInterface; -class ServerMutexCommand extends Command +use function Hyperf\Collection\collect; +use function Hyperf\Support\make; + +class ServerMutexCommand extends \Hyperf\Command\Command { - protected ?string $signature = 'trigger:server-mutex {--C|connection= : connection} {--L|list : list all server mutexes.}'; + protected ?string $signature = 'trigger:server-mutex {action : list|release} {--C|connection=default : The connection name}'; - protected string $description = 'Server mutex management.'; + public function __construct(protected ContainerInterface $container) + { + return parent::__construct(); + } public function handle() { + $action = $this->input->getArgument('action'); + $redis = $this->container->get(Redis::class); + $config = $this->container->get(ConfigInterface::class); + + if ($action === 'list') { + $headers = ['Connection', 'Holder', 'Owner', 'Expires At']; + $mutexes = collect($config->get('trigger.connections', [])) + ->reject(function ($config, $connection) { + return ! $config['server_mutex']['enable']; + }) + ->transform(function ($config, $connection) use ($redis) { + $mutex = make(ServerMutexInterface::class, [ + 'name' => 'trigger:mutex:' . $connection, + 'options' => $config['server_mutex'] ?? [] + ['connection' => $connection], + ]); + + $holder = (fn () => $this->name ?? 'unknown')->call($mutex); + $expiresAt = $redis->ttl($holder); + $owner = $redis->get($holder) ?? 'none'; + + return [$connection, $holder, $owner, $expiresAt > 0 ? date('Y-m-d H:i:s', time() + $expiresAt) : 'expired']; + }) + ->toArray(); + + $this->table($headers, $mutexes); + return; + } + if ($action === 'release') { + $connection = $this->input->getOption('connection'); + $options = $config->get("trigger.connections.{$connection}.server_mutex", []); + $mutex = make(ServerMutexInterface::class, [ + 'name' => 'trigger:mutex:' . $connection, + 'options' => $options + ['connection' => $connection], + ]); + $mutex->release(true); + $this->line("Released mutex for connection: {$connection}"); + return; + } + + $this->error('Invalid action. Use "list" or "release".'); } } From 85163b84823fb463ddc7579f2e3d45ec5ab9592f Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:26:55 +0800 Subject: [PATCH 03/10] refactor: update ServerMutexCommand and RedisServerMutex for improved clarity and functionality --- .../src/Command/ServerMutexCommand.php | 26 ++++++++++++------- src/trigger/src/Consumer.php | 2 +- src/trigger/src/Mutex/RedisServerMutex.php | 2 +- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/trigger/src/Command/ServerMutexCommand.php b/src/trigger/src/Command/ServerMutexCommand.php index bd6c05b4a..f7f2ec19a 100644 --- a/src/trigger/src/Command/ServerMutexCommand.php +++ b/src/trigger/src/Command/ServerMutexCommand.php @@ -35,36 +35,44 @@ public function handle() $config = $this->container->get(ConfigInterface::class); if ($action === 'list') { - $headers = ['Connection', 'Holder', 'Owner', 'Expires At']; + $headers = ['Connection', 'Name', 'Owner', 'Expires At']; $mutexes = collect($config->get('trigger.connections', [])) ->reject(function ($config, $connection) { return ! $config['server_mutex']['enable']; }) ->transform(function ($config, $connection) use ($redis) { $mutex = make(ServerMutexInterface::class, [ - 'name' => 'trigger:mutex:' . $connection, - 'options' => $config['server_mutex'] ?? [] + ['connection' => $connection], + 'connection' => $connection, + 'options' => $config['server_mutex'] ?? [], ]); - $holder = (fn () => $this->name ?? 'unknown')->call($mutex); - $expiresAt = $redis->ttl($holder); - $owner = $redis->get($holder) ?? 'none'; + $name = $mutex->getName(); + $ttl = $redis->ttl($name); + $owner = $redis->get($name) ?? 'none'; - return [$connection, $holder, $owner, $expiresAt > 0 ? date('Y-m-d H:i:s', time() + $expiresAt) : 'expired']; + return [$connection, $name, $owner, $ttl > 0 ? date('Y-m-d H:i:s', time() + $ttl) : 'expired']; }) ->toArray(); $this->table($headers, $mutexes); return; } + if ($action === 'release') { $connection = $this->input->getOption('connection'); + + if (! $connection) { + $this->error('Please specify the connection name using --connection option.'); + return; + } + $options = $config->get("trigger.connections.{$connection}.server_mutex", []); $mutex = make(ServerMutexInterface::class, [ - 'name' => 'trigger:mutex:' . $connection, - 'options' => $options + ['connection' => $connection], + 'connection' => $connection, + 'options' => $options, ]); $mutex->release(true); + $this->line("Released mutex for connection: {$connection}"); return; } diff --git a/src/trigger/src/Consumer.php b/src/trigger/src/Consumer.php index 3f699a054..f45933fb3 100644 --- a/src/trigger/src/Consumer.php +++ b/src/trigger/src/Consumer.php @@ -62,8 +62,8 @@ public function __construct( if ($this->config->get('server_mutex.enable', true)) { $this->serverMutex = make(ServerMutexInterface::class, [ 'connection' => $this->connection, - 'owner' => Util::getInternalIp(), 'options' => (array) $this->config->get('server_mutex', []), + 'owner' => Util::getInternalIp(), 'logger' => $this->logger, ]); } diff --git a/src/trigger/src/Mutex/RedisServerMutex.php b/src/trigger/src/Mutex/RedisServerMutex.php index 57fba1b99..38d60a1a3 100644 --- a/src/trigger/src/Mutex/RedisServerMutex.php +++ b/src/trigger/src/Mutex/RedisServerMutex.php @@ -36,8 +36,8 @@ class RedisServerMutex implements ServerMutexInterface public function __construct( protected Redis $redis, protected ?string $connection = null, - protected ?string $owner = null, array $options = [], + protected ?string $owner = null, protected ?LoggerInterface $logger = null ) { $this->name = sprintf( From d2bcc79b7d23e776fa7442d22f281b94e8287524 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:28:25 +0800 Subject: [PATCH 04/10] Update --- src/trigger/src/Command/ServerMutexCommand.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/trigger/src/Command/ServerMutexCommand.php b/src/trigger/src/Command/ServerMutexCommand.php index f7f2ec19a..d96dd97dc 100644 --- a/src/trigger/src/Command/ServerMutexCommand.php +++ b/src/trigger/src/Command/ServerMutexCommand.php @@ -25,7 +25,7 @@ class ServerMutexCommand extends \Hyperf\Command\Command public function __construct(protected ContainerInterface $container) { - return parent::__construct(); + parent::__construct(); } public function handle() From a06ede6aa2631b394b8c733d73561512b3cd0d24 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:30:34 +0800 Subject: [PATCH 05/10] refactor: improve server mutex filtering logic to handle missing configuration --- src/trigger/src/Command/ServerMutexCommand.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/trigger/src/Command/ServerMutexCommand.php b/src/trigger/src/Command/ServerMutexCommand.php index d96dd97dc..c3e0c5b0e 100644 --- a/src/trigger/src/Command/ServerMutexCommand.php +++ b/src/trigger/src/Command/ServerMutexCommand.php @@ -38,7 +38,7 @@ public function handle() $headers = ['Connection', 'Name', 'Owner', 'Expires At']; $mutexes = collect($config->get('trigger.connections', [])) ->reject(function ($config, $connection) { - return ! $config['server_mutex']['enable']; + return ! ($config['server_mutex']['enable'] ?? true); }) ->transform(function ($config, $connection) use ($redis) { $mutex = make(ServerMutexInterface::class, [ From dc188a625fc22bcba01a2e0921099b0261d78d7d Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:32:15 +0800 Subject: [PATCH 06/10] refactor: add ServerMutexCommand to the command list in ConfigProvider --- src/trigger/src/ConfigProvider.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/trigger/src/ConfigProvider.php b/src/trigger/src/ConfigProvider.php index a0c83ef42..746cf8731 100644 --- a/src/trigger/src/ConfigProvider.php +++ b/src/trigger/src/ConfigProvider.php @@ -28,6 +28,7 @@ public function __invoke(): array ], 'commands' => [ Command\ConsumeCommand::class, + Command\ServerMutexCommand::class, Command\SubscribersCommand::class, Command\TriggersCommand::class, ], From cabc4a2502253aa3d6b13e55fd6532466f6b1f95 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:34:26 +0800 Subject: [PATCH 07/10] refactor: improve variable naming in ServerMutexCommand for clarity --- src/trigger/src/Command/ServerMutexCommand.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/trigger/src/Command/ServerMutexCommand.php b/src/trigger/src/Command/ServerMutexCommand.php index c3e0c5b0e..51a163f4f 100644 --- a/src/trigger/src/Command/ServerMutexCommand.php +++ b/src/trigger/src/Command/ServerMutexCommand.php @@ -40,10 +40,10 @@ public function handle() ->reject(function ($config, $connection) { return ! ($config['server_mutex']['enable'] ?? true); }) - ->transform(function ($config, $connection) use ($redis) { + ->transform(function ($connectionConfig, $connection) use ($redis) { $mutex = make(ServerMutexInterface::class, [ 'connection' => $connection, - 'options' => $config['server_mutex'] ?? [], + 'options' => $connectionConfig['server_mutex'] ?? [], ]); $name = $mutex->getName(); From c34725bc1ce896b620b1f11b062009e8057bf3a7 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:36:21 +0800 Subject: [PATCH 08/10] refactor: set default value for connection in RedisServerMutex constructor --- src/trigger/src/Mutex/RedisServerMutex.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/trigger/src/Mutex/RedisServerMutex.php b/src/trigger/src/Mutex/RedisServerMutex.php index 38d60a1a3..22d797ee3 100644 --- a/src/trigger/src/Mutex/RedisServerMutex.php +++ b/src/trigger/src/Mutex/RedisServerMutex.php @@ -35,7 +35,7 @@ class RedisServerMutex implements ServerMutexInterface public function __construct( protected Redis $redis, - protected ?string $connection = null, + protected string $connection = 'default', array $options = [], protected ?string $owner = null, protected ?LoggerInterface $logger = null From cbadf68f180ea092aff6544ad864ed97ec3c9da2 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:38:58 +0800 Subject: [PATCH 09/10] refactor: update RedisServerMutex constructor to conditionally set options --- src/trigger/src/Mutex/RedisServerMutex.php | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/trigger/src/Mutex/RedisServerMutex.php b/src/trigger/src/Mutex/RedisServerMutex.php index 22d797ee3..d90d21c68 100644 --- a/src/trigger/src/Mutex/RedisServerMutex.php +++ b/src/trigger/src/Mutex/RedisServerMutex.php @@ -46,9 +46,15 @@ public function __construct( $this->connection ); $this->owner = $owner ?? Util::getInternalIp(); - $this->expires = (int) ($options['expires'] ?? 60); - $this->keepaliveInterval = (int) ($options['keepalive_interval'] ?? 10); - $this->retryInterval = (int) ($options['retry_interval'] ?? 10); + if (isset($options['expires'])) { + $this->expires = (int) $options['expires']; + } + if (isset($options['keepalive_interval'])) { + $this->keepaliveInterval = (int) $options['keepalive_interval']; + } + if (isset($options['retry_interval'])) { + $this->retryInterval = (int) $options['retry_interval']; + } $this->timer = new Timer($this->logger); } From e8a9f60be304e46e985c46625c834c2cd46d55c7 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:39:20 +0800 Subject: [PATCH 10/10] refactor: move owner assignment in RedisServerMutex constructor for clarity --- src/trigger/src/Mutex/RedisServerMutex.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/trigger/src/Mutex/RedisServerMutex.php b/src/trigger/src/Mutex/RedisServerMutex.php index d90d21c68..c0b417808 100644 --- a/src/trigger/src/Mutex/RedisServerMutex.php +++ b/src/trigger/src/Mutex/RedisServerMutex.php @@ -45,7 +45,6 @@ public function __construct( $options['prefix'] ?? 'trigger:mutex:', $this->connection ); - $this->owner = $owner ?? Util::getInternalIp(); if (isset($options['expires'])) { $this->expires = (int) $options['expires']; } @@ -55,6 +54,7 @@ public function __construct( if (isset($options['retry_interval'])) { $this->retryInterval = (int) $options['retry_interval']; } + $this->owner = $owner ?? Util::getInternalIp(); $this->timer = new Timer($this->logger); }