diff --git a/src/lock/src/Annotation/Blockable.php b/src/lock/src/Annotation/Blockable.php index 7cd030e22..51b2d74e3 100644 --- a/src/lock/src/Annotation/Blockable.php +++ b/src/lock/src/Annotation/Blockable.php @@ -22,7 +22,8 @@ public function __construct( public ?string $value = null, public int $seconds = 0, public int $ttl = 0, - public string $driver = 'default' + public string $driver = 'default', + public int $heartbeat = 0 ) { } } diff --git a/src/lock/src/Annotation/BlockableAspect.php b/src/lock/src/Annotation/BlockableAspect.php index eef17789e..00e37f598 100644 --- a/src/lock/src/Annotation/BlockableAspect.php +++ b/src/lock/src/Annotation/BlockableAspect.php @@ -39,7 +39,7 @@ public function process(ProceedingJoinPoint $proceedingJoinPoint) $key = StringHelper::format($annotation->prefix, $arguments, $annotation->value); - return $this->lockFactory->make($key, $annotation->ttl, driver: $annotation->driver) + return $this->lockFactory->make($key, $annotation->ttl, driver: $annotation->driver, heartbeat: $annotation->heartbeat) ->block($annotation->seconds, fn () => $proceedingJoinPoint->process()); } } diff --git a/src/lock/src/Annotation/Lock.php b/src/lock/src/Annotation/Lock.php index 0e3a402ea..7340f21e9 100644 --- a/src/lock/src/Annotation/Lock.php +++ b/src/lock/src/Annotation/Lock.php @@ -21,7 +21,8 @@ public function __construct( public string $name, public int $seconds = 0, public ?string $owner = null, - public string $driver = 'default' + public string $driver = 'default', + public int $heartbeat = 0 ) { } } diff --git a/src/lock/src/Driver/AbstractLock.php b/src/lock/src/Driver/AbstractLock.php index 80fbc7708..38c3d53fc 100644 --- a/src/lock/src/Driver/AbstractLock.php +++ b/src/lock/src/Driver/AbstractLock.php @@ -11,10 +11,17 @@ namespace FriendsOfHyperf\Lock\Driver; +use FriendsOfHyperf\Lock\Exception\HeartbeatException; use FriendsOfHyperf\Lock\Exception\LockTimeoutException; +use Hyperf\Context\ApplicationContext; +use Hyperf\Contract\StdoutLoggerInterface; +use Hyperf\Coordinator\Constants; +use Hyperf\Coordinator\CoordinatorManager; +use Hyperf\Coroutine\Coroutine; use Hyperf\Stringable\Str; use Hyperf\Support\Traits\InteractsWithTime; use Override; +use Throwable; use function Hyperf\Support\now; @@ -32,12 +39,18 @@ abstract class AbstractLock implements LockInterface */ protected int $sleepMilliseconds = 250; + protected int $heartbeat = 0; + /** * Create a new lock instance. */ - public function __construct(protected string $name, protected int $seconds, ?string $owner = null) + public function __construct(protected string $name, protected int $seconds, ?string $owner = null, int $heartbeat = 0) { $this->owner = $owner ?? Str::random(); + $this->heartbeat = $heartbeat; + if ($seconds > 0 && $heartbeat > 0 && $seconds <= $heartbeat) { + throw new HeartbeatException('Heartbeat must be less than lock seconds.'); + } } /** @@ -131,6 +144,8 @@ public function isOwnedBy($owner): bool return $this->getCurrentOwner() === $owner; } + abstract protected function delayExpiration(): bool; + /** * Returns the owner value written into the driver for this lock. */ @@ -143,4 +158,35 @@ protected function isOwnedByCurrentProcess(): bool { return $this->isOwnedBy($this->owner); } + + protected function heartbeat(): bool + { + if ($this->heartbeat <= 0 || $this->seconds <= 0 || ! Coroutine::inCoroutine()) { + return true; + } + Coroutine::create(function () { + while (true) { + if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield($this->heartbeat)) { + break; + } + if ($this->heartbeat == 0) { + return; + } + if (! $this->isOwnedByCurrentProcess()) { + return; + } + try { + $this->delayExpiration(); + } catch (Throwable $throwable) { + ApplicationContext::getContainer()->get(StdoutLoggerInterface::class)?->warning($throwable); + } + } + }); + return true; + } + + protected function stopHeartbeat(): void + { + $this->heartbeat = 0; + } } diff --git a/src/lock/src/Driver/CacheLock.php b/src/lock/src/Driver/CacheLock.php index 8c6bfbd11..dd5d43efe 100644 --- a/src/lock/src/Driver/CacheLock.php +++ b/src/lock/src/Driver/CacheLock.php @@ -26,9 +26,9 @@ class CacheLock extends AbstractLock /** * Create a new lock instance. */ - public function __construct(string $name, int $seconds, ?string $owner = null, array $constructor = []) + public function __construct(string $name, int $seconds, ?string $owner = null, array $constructor = [], int $heartbeat = 0) { - parent::__construct($name, $seconds, $owner); + parent::__construct($name, $seconds, $owner, $heartbeat); $container = ApplicationContext::getContainer(); $cacheManager = $container->get(CacheManager::class); @@ -46,7 +46,7 @@ public function acquire(): bool return false; } - return $this->store->set($this->name, $this->owner, $this->seconds); + return $this->store->set($this->name, $this->owner, $this->seconds) && $this->heartbeat(); } /** @@ -56,6 +56,7 @@ public function acquire(): bool public function release(): bool { if ($this->isOwnedByCurrentProcess()) { + $this->stopHeartbeat(); return $this->store->delete($this->name); } @@ -68,9 +69,19 @@ public function release(): bool #[Override] public function forceRelease(): void { + $this->stopHeartbeat(); $this->store->delete($this->name); } + #[Override] + protected function delayExpiration(): bool + { + if ($this->seconds > 0) { + return $this->store->set($this->name, $this->owner, $this->seconds); + } + return true; + } + /** * Returns the owner value written into the driver for this lock. * @return string diff --git a/src/lock/src/Driver/CoroutineLock.php b/src/lock/src/Driver/CoroutineLock.php index 099b134d1..feebcb05c 100644 --- a/src/lock/src/Driver/CoroutineLock.php +++ b/src/lock/src/Driver/CoroutineLock.php @@ -43,12 +43,13 @@ public function __construct( string $name, int $seconds, ?string $owner = null, - array $constructor = [] + array $constructor = [], + int $heartbeat = 0 ) { $constructor = array_merge(['prefix' => ''], $constructor); $name = $constructor['prefix'] . $name; - parent::__construct($name, $seconds, $owner); + parent::__construct($name, $seconds, $owner, $heartbeat); self::$owners ??= new WeakMap(); self::$timers ??= new WeakMap(); @@ -113,6 +114,13 @@ public function forceRelease(): void $chan->close(); } + #[Override] + protected function delayExpiration(): bool + { + // Not supported + return false; + } + /** * Returns the owner value written into the driver for this lock. * @return string diff --git a/src/lock/src/Driver/DatabaseLock.php b/src/lock/src/Driver/DatabaseLock.php index 90012be7b..80e734e0a 100644 --- a/src/lock/src/Driver/DatabaseLock.php +++ b/src/lock/src/Driver/DatabaseLock.php @@ -28,9 +28,9 @@ class DatabaseLock extends AbstractLock /** * Create a new lock instance. */ - public function __construct(string $name, int $seconds, ?string $owner = null, array $constructor = []) + public function __construct(string $name, int $seconds, ?string $owner = null, array $constructor = [], int $heartbeat = 0) { - parent::__construct($name, $seconds, $owner); + parent::__construct($name, $seconds, $owner, $heartbeat); $constructor = array_merge(['pool' => 'default', 'table' => 'locks', 'prefix' => ''], $constructor); if ($constructor['prefix']) { @@ -67,7 +67,7 @@ public function acquire(): bool $acquired = $updated >= 1; } - + $acquired && $this->heartbeat(); return $acquired; } @@ -78,6 +78,7 @@ public function acquire(): bool public function release(): bool { if ($this->isOwnedByCurrentProcess()) { + $this->stopHeartbeat(); $this->connection->table($this->table) ->where('key', $this->name) ->where('owner', $this->owner) @@ -95,11 +96,29 @@ public function release(): bool #[Override] public function forceRelease(): void { + $this->stopHeartbeat(); $this->connection->table($this->table) ->where('key', $this->name) ->delete(); } + #[Override] + protected function delayExpiration(): bool + { + if ($this->seconds > 0) { + $updated = $this->connection->table($this->table) + ->where('key', $this->name) + ->where(fn ($query) => $query->where('owner', $this->owner)->orWhere('expiration', '<=', time())) + ->update([ + 'owner' => $this->owner, + 'expiration' => $this->expiresAt(), + ]); + + return $updated >= 1; + } + return true; + } + /** * Get the UNIX timestamp indicating when the lock should expire. */ diff --git a/src/lock/src/Driver/FileSystemLock.php b/src/lock/src/Driver/FileSystemLock.php index de703ec8f..f6a4524a2 100644 --- a/src/lock/src/Driver/FileSystemLock.php +++ b/src/lock/src/Driver/FileSystemLock.php @@ -26,9 +26,9 @@ class FileSystemLock extends AbstractLock /** * Create a new lock instance. */ - public function __construct(string $name, int $seconds, ?string $owner = null, array $constructor = []) + public function __construct(string $name, int $seconds, ?string $owner = null, array $constructor = [], int $heartbeat = 0) { - parent::__construct($name, $seconds, $owner); + parent::__construct($name, $seconds, $owner, $heartbeat); $constructor = array_merge(['config' => ['prefix' => 'lock:']], $constructor); $this->store = make(FileSystemDriver::class, $constructor); @@ -44,7 +44,7 @@ public function acquire(): bool return false; } - return $this->store->set($this->name, $this->owner, $this->seconds) == true; + return $this->store->set($this->name, $this->owner, $this->seconds) == true && $this->heartbeat(); } /** @@ -54,6 +54,7 @@ public function acquire(): bool public function release(): bool { if ($this->isOwnedByCurrentProcess()) { + $this->stopHeartbeat(); return $this->store->delete($this->name); } @@ -66,9 +67,19 @@ public function release(): bool #[Override] public function forceRelease(): void { + $this->stopHeartbeat(); $this->store->delete($this->name); } + #[Override] + protected function delayExpiration(): bool + { + if ($this->seconds > 0) { + return $this->store->set($this->name, $this->owner, $this->seconds); + } + return true; + } + /** * Returns the owner value written into the driver for this lock. * @return string diff --git a/src/lock/src/Driver/RedisLock.php b/src/lock/src/Driver/RedisLock.php index 11e399577..7ddc15fce 100644 --- a/src/lock/src/Driver/RedisLock.php +++ b/src/lock/src/Driver/RedisLock.php @@ -28,9 +28,9 @@ class RedisLock extends AbstractLock /** * Create a new lock instance. */ - public function __construct(string $name, int $seconds, ?string $owner = null, array $constructor = []) + public function __construct(string $name, int $seconds, ?string $owner = null, array $constructor = [], int $heartbeat = 0) { - parent::__construct($name, $seconds, $owner); + parent::__construct($name, $seconds, $owner, $heartbeat); $constructor = array_merge(['pool' => 'default', 'prefix' => ''], $constructor); if ($constructor['prefix']) { @@ -46,7 +46,7 @@ public function __construct(string $name, int $seconds, ?string $owner = null, a public function acquire(): bool { if ($this->seconds > 0) { - return $this->store->set($this->name, $this->owner, ['NX', 'EX' => $this->seconds]) == true; + return $this->store->set($this->name, $this->owner, ['NX', 'EX' => $this->seconds]) == true && $this->heartbeat(); } return $this->store->setNX($this->name, $this->owner) === true; @@ -58,6 +58,7 @@ public function acquire(): bool #[Override] public function release(): bool { + $this->stopHeartbeat(); return (bool) $this->store->eval(LuaScripts::releaseLock(), [$this->name, $this->owner], 1); } @@ -67,9 +68,19 @@ public function release(): bool #[Override] public function forceRelease(): void { + $this->stopHeartbeat(); $this->store->del($this->name); } + #[Override] + protected function delayExpiration(): bool + { + if ($this->seconds > 0) { + return $this->store->set($this->name, $this->owner, ['EX' => $this->seconds]); + } + return true; + } + /** * Returns the owner value written into the driver for this lock. * @return string diff --git a/src/lock/src/Exception/HeartbeatException.php b/src/lock/src/Exception/HeartbeatException.php new file mode 100644 index 000000000..8ecdd8932 --- /dev/null +++ b/src/lock/src/Exception/HeartbeatException.php @@ -0,0 +1,18 @@ +get(LockFactory::class); @@ -25,5 +25,5 @@ function lock(?string $name = null, int $seconds = 0, ?string $owner = null, str return $factory; } - return $factory->make($name, $seconds, $owner, $driver); + return $factory->make($name, $seconds, $owner, $driver, $heartbeat); } diff --git a/src/lock/src/Listener/RegisterPropertyHandlerListener.php b/src/lock/src/Listener/RegisterPropertyHandlerListener.php index 0ff5372a2..ce9b902ee 100644 --- a/src/lock/src/Listener/RegisterPropertyHandlerListener.php +++ b/src/lock/src/Listener/RegisterPropertyHandlerListener.php @@ -48,8 +48,9 @@ public function process(object $event): void $seconds = (int) $annotation->seconds; $owner = $annotation->owner; $driver = $annotation->driver; + $heartbeat = $annotation->heartbeat; - $reflectionProperty->setValue($object, $this->lockFactory->make($name, $seconds, $owner, $driver)); + $reflectionProperty->setValue($object, $this->lockFactory->make($name, $seconds, $owner, $driver, $heartbeat)); } }); } diff --git a/src/lock/src/LockFactory.php b/src/lock/src/LockFactory.php index a7f13aa46..3c866c234 100644 --- a/src/lock/src/LockFactory.php +++ b/src/lock/src/LockFactory.php @@ -27,7 +27,7 @@ public function __construct(private ConfigInterface $config) /** * Get a lock instance. */ - public function make(string $name, int $seconds = 0, ?string $owner = null, string $driver = 'default'): LockInterface + public function make(string $name, int $seconds = 0, ?string $owner = null, string $driver = 'default', int $heartbeat = 0): LockInterface { $driver = $driver ?: 'default'; @@ -45,6 +45,7 @@ public function make(string $name, int $seconds = 0, ?string $owner = null, stri 'seconds' => $seconds, 'owner' => $owner, 'constructor' => $constructor, + 'heartbeat' => $heartbeat, ]); } } diff --git a/tests/Lock/AbstractLockTest.php b/tests/Lock/AbstractLockTest.php index ebbe3b5d0..fa5b4f992 100644 --- a/tests/Lock/AbstractLockTest.php +++ b/tests/Lock/AbstractLockTest.php @@ -59,6 +59,11 @@ protected function getCurrentOwner(): string { return $this->currentOwner; } + + protected function delayExpiration(): bool + { + return true; + } } test('abstract lock generates random owner when none provided', function () { diff --git a/tests/Lock/FunctionsTest.php b/tests/Lock/FunctionsTest.php index f6ff6d7c7..e1a4b8642 100644 --- a/tests/Lock/FunctionsTest.php +++ b/tests/Lock/FunctionsTest.php @@ -25,7 +25,7 @@ test('lock function returns lock instance when name provided', function () { $lockInstance = $this->mock(LockInterface::class); $factory = $this->mock(LockFactory::class, function ($mock) use ($lockInstance) { - $mock->shouldReceive('make')->with('foo', 0, null, 'default')->andReturn($lockInstance); + $mock->shouldReceive('make')->with('foo', 0, null, 'default', 0)->andReturn($lockInstance); }); $this->instance(LockFactory::class, $factory); @@ -37,7 +37,7 @@ test('lock function passes all parameters correctly', function () { $lockInstance = $this->mock(LockInterface::class); $factory = $this->mock(LockFactory::class, function ($mock) use ($lockInstance) { - $mock->shouldReceive('make')->with('mylock', 60, 'owner123', 'redis')->andReturn($lockInstance); + $mock->shouldReceive('make')->with('mylock', 60, 'owner123', 'redis', 0)->andReturn($lockInstance); }); $this->instance(LockFactory::class, $factory); @@ -49,7 +49,7 @@ test('lock function uses default values correctly', function () { $lockInstance = $this->mock(LockInterface::class); $factory = $this->mock(LockFactory::class, function ($mock) use ($lockInstance) { - $mock->shouldReceive('make')->with('test', 30, null, 'default')->andReturn($lockInstance); + $mock->shouldReceive('make')->with('test', 30, null, 'default', 0)->andReturn($lockInstance); }); $this->instance(LockFactory::class, $factory);