diff --git a/src/trigger/publish/trigger.php b/src/trigger/publish/trigger.php index 832855ef1..656686d64 100644 --- a/src/trigger/publish/trigger.php +++ b/src/trigger/publish/trigger.php @@ -31,7 +31,7 @@ 'server_mutex' => [ 'enable' => true, - 'prefix' => env('APP_ENV', 'dev') . '_', + 'prefix' => env('APP_NAME', 'trigger') . ':', 'expires' => 30, 'keepalive_interval' => 10, 'retry_interval' => 10, diff --git a/src/trigger/src/Command/ServerMutexCommand.php b/src/trigger/src/Command/ServerMutexCommand.php new file mode 100644 index 000000000..51a163f4f --- /dev/null +++ b/src/trigger/src/Command/ServerMutexCommand.php @@ -0,0 +1,82 @@ +input->getArgument('action'); + $redis = $this->container->get(Redis::class); + $config = $this->container->get(ConfigInterface::class); + + if ($action === 'list') { + $headers = ['Connection', 'Name', 'Owner', 'Expires At']; + $mutexes = collect($config->get('trigger.connections', [])) + ->reject(function ($config, $connection) { + return ! ($config['server_mutex']['enable'] ?? true); + }) + ->transform(function ($connectionConfig, $connection) use ($redis) { + $mutex = make(ServerMutexInterface::class, [ + 'connection' => $connection, + 'options' => $connectionConfig['server_mutex'] ?? [], + ]); + + $name = $mutex->getName(); + $ttl = $redis->ttl($name); + $owner = $redis->get($name) ?? 'none'; + + return [$connection, $name, $owner, $ttl > 0 ? date('Y-m-d H:i:s', time() + $ttl) : 'expired']; + }) + ->toArray(); + + $this->table($headers, $mutexes); + return; + } + + if ($action === 'release') { + $connection = $this->input->getOption('connection'); + + if (! $connection) { + $this->error('Please specify the connection name using --connection option.'); + return; + } + + $options = $config->get("trigger.connections.{$connection}.server_mutex", []); + $mutex = make(ServerMutexInterface::class, [ + 'connection' => $connection, + 'options' => $options, + ]); + $mutex->release(true); + + $this->line("Released mutex for connection: {$connection}"); + return; + } + + $this->error('Invalid action. Use "list" or "release".'); + } +} diff --git a/src/trigger/src/ConfigProvider.php b/src/trigger/src/ConfigProvider.php index a0c83ef42..746cf8731 100644 --- a/src/trigger/src/ConfigProvider.php +++ b/src/trigger/src/ConfigProvider.php @@ -28,6 +28,7 @@ public function __invoke(): array ], 'commands' => [ Command\ConsumeCommand::class, + Command\ServerMutexCommand::class, Command\SubscribersCommand::class, Command\TriggersCommand::class, ], diff --git a/src/trigger/src/Consumer.php b/src/trigger/src/Consumer.php index 6bd8b975b..f45933fb3 100644 --- a/src/trigger/src/Consumer.php +++ b/src/trigger/src/Consumer.php @@ -28,39 +28,45 @@ class Consumer { - public readonly Config $config; + public Config $config; - public readonly string $name; + public string $name; - public readonly string $identifier; + public string $identifier; - public readonly ?HealthMonitor $healthMonitor; + public ?HealthMonitor $healthMonitor = null; - public readonly ?ServerMutexInterface $serverMutex; + public ?ServerMutexInterface $serverMutex = null; - public readonly BinLogCurrentSnapshotInterface $binLogCurrentSnapshot; + public BinLogCurrentSnapshotInterface $binLogCurrentSnapshot; private bool $stopped = false; public function __construct( - protected readonly SubscriberManager $subscriberManager, - protected readonly TriggerManager $triggerManager, - public readonly string $connection = 'default', + protected SubscriberManager $subscriberManager, + protected TriggerManager $triggerManager, + public string $connection = 'default', array $options = [], - public readonly ?LoggerInterface $logger = null + public ?LoggerInterface $logger = null ) { $this->name = $options['name'] ?? sprintf('trigger.%s', $this->connection); $this->identifier = $options['identifier'] ?? sprintf('trigger.%s', $this->connection); $this->config = new Config($options); $this->binLogCurrentSnapshot = make(BinLogCurrentSnapshotInterface::class, ['consumer' => $this]); - $this->healthMonitor = $this->config->get('health_monitor.enable', true) ? make(HealthMonitor::class, ['consumer' => $this]) : null; - $this->serverMutex = $this->config->get('server_mutex.enable', true) ? make(ServerMutexInterface::class, [ - 'name' => 'trigger:mutex:' . $this->connection, - 'owner' => Util::getInternalIp(), - 'options' => $this->config->get('server_mutex', []) + ['connection' => $this->connection], - 'logger' => $this->logger, - ]) : null; + + if ($this->config->get('health_monitor.enable', true)) { + $this->healthMonitor = make(HealthMonitor::class, ['consumer' => $this]); + } + + if ($this->config->get('server_mutex.enable', true)) { + $this->serverMutex = make(ServerMutexInterface::class, [ + 'connection' => $this->connection, + 'options' => (array) $this->config->get('server_mutex', []), + 'owner' => Util::getInternalIp(), + 'logger' => $this->logger, + ]); + } } public function start(): void diff --git a/src/trigger/src/Mutex/RedisServerMutex.php b/src/trigger/src/Mutex/RedisServerMutex.php index adaf19f90..c0b417808 100644 --- a/src/trigger/src/Mutex/RedisServerMutex.php +++ b/src/trigger/src/Mutex/RedisServerMutex.php @@ -31,24 +31,36 @@ class RedisServerMutex implements ServerMutexInterface protected int $retryInterval = 10; - protected string $connection = 'default'; + protected string $name = ''; public function __construct( protected Redis $redis, - protected ?string $name = null, - protected ?string $owner = null, + protected string $connection = 'default', array $options = [], + protected ?string $owner = null, protected ?LoggerInterface $logger = null ) { - $this->expires = (int) ($options['expires'] ?? 60); - $this->keepaliveInterval = (int) ($options['keepalive_interval'] ?? 10); - $this->name = ($options['prefix'] ?? '') . ($name ?? sprintf('trigger:server:%s', $this->connection)); - $this->owner = $owner ?? Util::getInternalIp(); - if (isset($options['connection'])) { - $this->connection = $options['connection']; + $this->name = sprintf( + '%s%s', + $options['prefix'] ?? 'trigger:mutex:', + $this->connection + ); + if (isset($options['expires'])) { + $this->expires = (int) $options['expires']; + } + if (isset($options['keepalive_interval'])) { + $this->keepaliveInterval = (int) $options['keepalive_interval']; + } + if (isset($options['retry_interval'])) { + $this->retryInterval = (int) $options['retry_interval']; } + $this->owner = $owner ?? Util::getInternalIp(); $this->timer = new Timer($this->logger); - $this->retryInterval = (int) ($options['retry_interval'] ?? 10); + } + + public function getName(): string + { + return $this->name; } public function attempt(?callable $callback = null): void diff --git a/src/trigger/src/Mutex/ServerMutexInterface.php b/src/trigger/src/Mutex/ServerMutexInterface.php index 208b2e96d..94dad2d46 100644 --- a/src/trigger/src/Mutex/ServerMutexInterface.php +++ b/src/trigger/src/Mutex/ServerMutexInterface.php @@ -13,6 +13,8 @@ interface ServerMutexInterface { + public function getName(): string; + public function attempt(?callable $callback = null); public function release(bool $force = false);