From 8b629b6829697ee6bdf4edf0eb1733123826318c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 02:07:08 +0100 Subject: [PATCH 01/22] extract distributed mutex from redis --- ...tRedlockMutex.php => DistributedMutex.php} | 32 +------ src/Mutex/RedisMutex.php | 88 +++++++++++++------ ...MutexTest.php => DistributedMutexTest.php} | 24 ++--- 3 files changed, 76 insertions(+), 68 deletions(-) rename src/Mutex/{AbstractRedlockMutex.php => DistributedMutex.php} (82%) rename tests/Mutex/{AbstractRedlockMutexTest.php => DistributedMutexTest.php} (92%) diff --git a/src/Mutex/AbstractRedlockMutex.php b/src/Mutex/DistributedMutex.php similarity index 82% rename from src/Mutex/AbstractRedlockMutex.php rename to src/Mutex/DistributedMutex.php index 5929e00..64331a5 100644 --- a/src/Mutex/AbstractRedlockMutex.php +++ b/src/Mutex/DistributedMutex.php @@ -14,13 +14,9 @@ /** * Distributed mutex based on the Redlock algorithm. * - * @template TClient of object - * - * @internal - * - * @see https://redis.io/topics/distlock#the-redlock-algorithm + * @see http://redis.io/topics/distlock#the-redlock-algorithm */ -abstract class AbstractRedlockMutex extends AbstractSpinlockWithTokenMutex implements LoggerAwareInterface +class DistributedMutex extends AbstractSpinlockWithTokenMutex implements LoggerAwareInterface { use LoggerAwareTrait; @@ -46,13 +42,14 @@ public function __construct(array $clients, string $name, float $acquireTimeout #[\Override] protected function acquireWithToken(string $key, float $expireTimeout) { + $token = LockUtil::getInstance()->makeRandomToken(); + // 1. This differs from the specification to avoid an overflow on 32-Bit systems. $startTs = microtime(true); // 2. $acquired = 0; $errored = 0; - $token = LockUtil::getInstance()->makeRandomToken(); $exception = null; foreach ($this->clients as $index => $client) { try { @@ -145,25 +142,4 @@ private function isMajority(int $count): bool { return $count > count($this->clients) / 2; } - - /** - * Sets the key only if such key doesn't exist at the server yet. - * - * @param TClient $client - * @param float $expire The TTL seconds - * - * @return bool True if the key was set - */ - abstract protected function add(object $client, string $key, string $value, float $expire): bool; - - /** - * @param TClient $client - * @param list $keys - * @param list $arguments - * - * @return mixed The script result, or false if executing failed - * - * @throws LockReleaseException An unexpected error happened - */ - abstract protected function evalScript(object $client, string $luaScript, array $keys, array $arguments); } diff --git a/src/Mutex/RedisMutex.php b/src/Mutex/RedisMutex.php index b154ace..0641be8 100644 --- a/src/Mutex/RedisMutex.php +++ b/src/Mutex/RedisMutex.php @@ -11,30 +11,67 @@ use Predis\PredisException; /** - * Distributed mutex based on the Redlock algorithm supporting the phpredis extension and Predis API. + * Redis based spinlock implementation supporting the phpredis extension and Predis API. * * @phpstan-type TClient \Redis|\RedisCluster|PredisClientInterface - * - * @extends AbstractRedlockMutex - * - * @see https://redis.io/topics/distlock#the-redlock-algorithm */ -class RedisMutex extends AbstractRedlockMutex +class RedisMutex extends AbstractSpinlockWithTokenMutex { + /** @var TClient */ + private object $client; + + /** + * The Redis instance needs to be connected. I.e. Redis::connect() was called already. + * + * @param TClient $client + * @param float $acquireTimeout In seconds + * @param float $expireTimeout In seconds + */ + public function __construct(object $client, string $name, float $acquireTimeout = 3, float $expireTimeout = \INF) + { + parent::__construct($name, $acquireTimeout, $expireTimeout); + + $this->client = $client; + } + /** * @param TClient $client * - * @phpstan-assert-if-true \Redis|\RedisCluster $client + * @phpstan-assert-if-true \Redis|\RedisCluster $this->client */ - private function isClientPHPRedis(object $client): bool + private function isClientPHPRedis(): bool { - $res = $client instanceof \Redis || $client instanceof \RedisCluster; + $res = $this->client instanceof \Redis || $this->client instanceof \RedisCluster; - \assert($res === !$client instanceof PredisClientInterface); + \assert($res === !$this->client instanceof PredisClientInterface); return $res; } + #[\Override] + protected function acquireWithToken(string $key, float $expireTimeout) + { + $token = LockUtil::getInstance()->makeRandomToken(); + + return $this->add($key, $token, $expireTimeout) + ? $token + : false; + } + + #[\Override] + protected function releaseWithToken(string $key, string $token): bool + { + $script = <<<'LUA' + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + LUA; + + return $this->evalScript($script, [$key], [$token]); + } + private function makeRedisExpireTimeoutMillis(float $value): int { $res = LockUtil::getInstance()->castFloatToInt(ceil($value * 1000)); @@ -60,15 +97,14 @@ private function makeRedisExpireTimeoutMillis(float $value): int /** * @throws LockAcquireException */ - #[\Override] - protected function add(object $client, string $key, string $value, float $expire): bool + protected function add(string $key, string $value, float $expire): bool { $expireTimeoutMillis = $this->makeRedisExpireTimeoutMillis($expire); - if ($this->isClientPHPRedis($client)) { + if ($this->isClientPHPRedis()) { try { // Will set the key, if it doesn't exist, with a ttl of $expire seconds - return $client->set($key, $value, ['nx', 'px' => $expireTimeoutMillis]); + return $this->client->set($key, $value, ['nx', 'px' => $expireTimeoutMillis]); } catch (\RedisException $e) { $message = sprintf( 'Failed to acquire lock for key \'%s\'', @@ -79,7 +115,7 @@ protected function add(object $client, string $key, string $value, float $expire } } else { try { - return $client->set($key, $value, 'PX', $expireTimeoutMillis, 'NX') !== null; + return $this->client->set($key, $value, 'PX', $expireTimeoutMillis, 'NX') !== null; } catch (PredisException $e) { $message = sprintf( 'Failed to acquire lock for key \'%s\'', @@ -91,24 +127,23 @@ protected function add(object $client, string $key, string $value, float $expire } } - #[\Override] - protected function evalScript(object $client, string $luaScript, array $keys, array $arguments) + protected function evalScript(string $luaScript, array $keys, array $arguments) { - if ($this->isClientPHPRedis($client)) { - $arguments = array_map(function ($v) use ($client) { + if ($this->isClientPHPRedis()) { + $arguments = array_map(function ($v) { /* * If a serialization mode such as "php" or "igbinary" is enabled, the arguments must be * serialized by us, because phpredis does not do this for the eval command. * * The keys must not be serialized. */ - $v = $client->_serialize($v); + $v = $this->client->_serialize($v); /* * If LZF compression is enabled for the redis connection and the runtime has the LZF * extension installed, compress the arguments as the final step. */ - if ($this->isLzfCompressionEnabled($client)) { + if ($this->isLzfCompressionEnabled()) { $v = lzf_compress($v); } @@ -116,28 +151,25 @@ protected function evalScript(object $client, string $luaScript, array $keys, ar }, $arguments); try { - return $client->eval($luaScript, [...$keys, ...$arguments], count($keys)); + return $this->client->eval($luaScript, [...$keys, ...$arguments], count($keys)); } catch (\RedisException $e) { throw new LockReleaseException('Failed to release lock', 0, $e); } } else { try { - return $client->eval($luaScript, count($keys), ...$keys, ...$arguments); + return $this->client->eval($luaScript, count($keys), ...$keys, ...$arguments); } catch (PredisException $e) { throw new LockReleaseException('Failed to release lock', 0, $e); } } } - /** - * @param \Redis|\RedisCluster $client - */ - private function isLzfCompressionEnabled(object $client): bool + private function isLzfCompressionEnabled(): bool { if (!\defined('Redis::COMPRESSION_LZF')) { return false; } - return $client->getOption(\Redis::OPT_COMPRESSION) === \Redis::COMPRESSION_LZF; + return $this->client->getOption(\Redis::OPT_COMPRESSION) === \Redis::COMPRESSION_LZF; } } diff --git a/tests/Mutex/AbstractRedlockMutexTest.php b/tests/Mutex/DistributedMutexTest.php similarity index 92% rename from tests/Mutex/AbstractRedlockMutexTest.php rename to tests/Mutex/DistributedMutexTest.php index 807ee1d..161d425 100644 --- a/tests/Mutex/AbstractRedlockMutexTest.php +++ b/tests/Mutex/DistributedMutexTest.php @@ -8,7 +8,7 @@ use Malkusch\Lock\Exception\LockAcquireTimeoutException; use Malkusch\Lock\Exception\LockReleaseException; use Malkusch\Lock\Exception\MutexException; -use Malkusch\Lock\Mutex\AbstractRedlockMutex; +use Malkusch\Lock\Mutex\DistributedMutex; use Malkusch\Lock\Util\LockUtil; use phpmock\environment\SleepEnvironmentBuilder; use phpmock\MockEnabledException; @@ -17,7 +17,7 @@ use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; -class AbstractRedlockMutexTest extends TestCase +class DistributedMutexTest extends TestCase { use PHPMock; @@ -43,9 +43,9 @@ protected function setUp(): void /** * @param int $count The amount of redis APIs * - * @return AbstractRedlockMutex&MockObject + * @return DistributedMutex&MockObject */ - private function createRedlockMutexMock(int $count, float $acquireTimeout = 1, float $expireTimeout = \INF): AbstractRedlockMutex + private function createDistributedMutexMock(int $count, float $acquireTimeout = 1, float $expireTimeout = \INF): DistributedMutex { $clients = array_map( static fn ($i) => new class($i) { @@ -59,7 +59,7 @@ public function __construct(int $i) range(0, $count - 1) ); - return $this->getMockBuilder(AbstractRedlockMutex::class) + return $this->getMockBuilder(DistributedMutex::class) ->setConstructorArgs([$clients, 'test', $acquireTimeout, $expireTimeout]) ->onlyMethods(['add', 'evalScript']) ->getMock(); @@ -79,7 +79,7 @@ public function testTooFewServerToAcquire(int $count, int $available): void $this->expectException(LockAcquireException::class); $this->expectExceptionCode(MutexException::CODE_REDLOCK_NOT_ENOUGH_SERVERS); - $mutex = $this->createRedlockMutexMock($count); + $mutex = $this->createDistributedMutexMock($count); $i = 0; $mutex->expects(self::exactly($count)) @@ -112,7 +112,7 @@ static function () use (&$i, $available): bool { #[DataProvider('provideMajorityCases')] public function testFaultTolerance(int $count, int $available): void { - $mutex = $this->createRedlockMutexMock($count); + $mutex = $this->createDistributedMutexMock($count); $mutex->expects(self::exactly($count)) ->method('evalScript') ->willReturn(true); @@ -149,7 +149,7 @@ public function testAcquireTooFewKeys(int $count, int $available): void $this->expectException(LockAcquireTimeoutException::class); $this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded'); - $mutex = $this->createRedlockMutexMock($count); + $mutex = $this->createDistributedMutexMock($count); $i = 0; $mutex->expects(self::any()) @@ -182,7 +182,7 @@ public function testAcquireTimeouts(int $count, float $timeout, float $delay): v $this->expectException(LockAcquireTimeoutException::class); $this->expectExceptionMessage('Lock acquire timeout of ' . LockUtil::getInstance()->formatTimeout($timeout) . ' seconds has been exceeded'); - $mutex = $this->createRedlockMutexMock($count, $timeout, $timeout); + $mutex = $this->createDistributedMutexMock($count, $timeout, $timeout); $mutex->expects(self::exactly($count)) ->method('evalScript') ->willReturn(true); @@ -220,7 +220,7 @@ public static function provideAcquireTimeoutsCases(): iterable #[DataProvider('provideMajorityCases')] public function testAcquireWithMajority(int $count, int $available): void { - $mutex = $this->createRedlockMutexMock($count); + $mutex = $this->createDistributedMutexMock($count); $mutex->expects(self::exactly($count)) ->method('evalScript') ->willReturn(true); @@ -250,7 +250,7 @@ static function () use (&$i, $available): bool { #[DataProvider('provideMinorityCases')] public function testTooFewServersToRelease(int $count, int $available): void { - $mutex = $this->createRedlockMutexMock($count); + $mutex = $this->createDistributedMutexMock($count); $mutex->expects(self::exactly($count)) ->method('add') ->willReturn(true); @@ -286,7 +286,7 @@ static function () use (&$i, $available): bool { #[DataProvider('provideMinorityCases')] public function testReleaseTooFewKeys(int $count, int $available): void { - $mutex = $this->createRedlockMutexMock($count); + $mutex = $this->createDistributedMutexMock($count); $mutex->expects(self::exactly($count)) ->method('add') ->willReturn(true); From dde7c7f2b49c75b6b3376b18c9f10cc5b1285b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 02:18:38 +0100 Subject: [PATCH 02/22] more --- phpstan.neon.dist | 2 +- src/Mutex/RedisMutex.php | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/phpstan.neon.dist b/phpstan.neon.dist index 872f626..3152f90 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -12,7 +12,7 @@ parameters: ignoreErrors: # TODO - - path: 'src/Mutex/AbstractRedlockMutex.php' + path: 'src/Mutex/DistributedMutex.php' identifier: if.condNotBoolean message: '~^Only booleans are allowed in an if condition, mixed given\.$~' count: 1 diff --git a/src/Mutex/RedisMutex.php b/src/Mutex/RedisMutex.php index 0641be8..95b24ec 100644 --- a/src/Mutex/RedisMutex.php +++ b/src/Mutex/RedisMutex.php @@ -24,8 +24,8 @@ class RedisMutex extends AbstractSpinlockWithTokenMutex * The Redis instance needs to be connected. I.e. Redis::connect() was called already. * * @param TClient $client - * @param float $acquireTimeout In seconds - * @param float $expireTimeout In seconds + * @param float $acquireTimeout In seconds + * @param float $expireTimeout In seconds */ public function __construct(object $client, string $name, float $acquireTimeout = 3, float $expireTimeout = \INF) { @@ -35,8 +35,6 @@ public function __construct(object $client, string $name, float $acquireTimeout } /** - * @param TClient $client - * * @phpstan-assert-if-true \Redis|\RedisCluster $this->client */ private function isClientPHPRedis(): bool From 1b2b1a85562914b167773fd52c0a8d201d272e35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 02:31:04 +0100 Subject: [PATCH 03/22] more --- src/Mutex/DistributedMutex.php | 35 ++++++++++------------------------ src/Mutex/RedisMutex.php | 16 ++++++++++++++-- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/src/Mutex/DistributedMutex.php b/src/Mutex/DistributedMutex.php index 64331a5..0388e19 100644 --- a/src/Mutex/DistributedMutex.php +++ b/src/Mutex/DistributedMutex.php @@ -20,16 +20,16 @@ class DistributedMutex extends AbstractSpinlockWithTokenMutex implements LoggerA { use LoggerAwareTrait; - /** @var array */ + /** @var array */ private array $clients; /** * The Redis instance needs to be connected. I.e. Redis::connect() was * called already. * - * @param array $clients - * @param float $acquireTimeout In seconds - * @param float $expireTimeout In seconds + * @param array $clients + * @param float $acquireTimeout In seconds + * @param float $expireTimeout In seconds */ public function __construct(array $clients, string $name, float $acquireTimeout = 3, float $expireTimeout = \INF) { @@ -53,18 +53,17 @@ protected function acquireWithToken(string $key, float $expireTimeout) $exception = null; foreach ($this->clients as $index => $client) { try { - if ($this->add($client, $key, $token, $expireTimeout)) { + if ($client->acquireWithToken($key, $expireTimeout)) { ++$acquired; } } catch (LockAcquireException $exception) { // todo if there is only one redis server, throw immediately. - $context = [ + $this->logger->warning('Could not set {key} = {token} at server #{index}', [ 'key' => $key, 'index' => $index, 'token' => $token, 'exception' => $exception, - ]; - $this->logger->warning('Could not set {key} = {token} at server #{index}', $context); + ]); ++$errored; } @@ -99,34 +98,20 @@ protected function acquireWithToken(string $key, float $expireTimeout) #[\Override] protected function releaseWithToken(string $key, string $token): bool { - /* - * All Redis commands must be analyzed before execution to determine which keys the command will operate on. In - * order for this to be true for EVAL, keys must be passed explicitly. - * - * @link https://redis.io/commands/set - */ - $script = <<<'EOD' - if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("del", KEYS[1]) - else - return 0 - end - EOD; $released = 0; foreach ($this->clients as $index => $client) { try { - if ($this->evalScript($client, $script, [$key], [$token])) { + if ($client->releaseWithToken($key, $token)) { ++$released; } } catch (LockReleaseException $e) { // todo throw if there is only one redis server - $context = [ + $this->logger->warning('Could not unset {key} = {token} at server #{index}', [ 'key' => $key, 'index' => $index, 'token' => $token, 'exception' => $e, - ]; - $this->logger->warning('Could not unset {key} = {token} at server #{index}', $context); + ]); } } diff --git a/src/Mutex/RedisMutex.php b/src/Mutex/RedisMutex.php index 95b24ec..765d31d 100644 --- a/src/Mutex/RedisMutex.php +++ b/src/Mutex/RedisMutex.php @@ -93,11 +93,15 @@ private function makeRedisExpireTimeoutMillis(float $value): int } /** + * Sets the key only if such key doesn't exist at the server yet. + * + * @return bool True if the key was set + * * @throws LockAcquireException */ - protected function add(string $key, string $value, float $expire): bool + protected function add(string $key, string $value, float $expireTimeout): bool { - $expireTimeoutMillis = $this->makeRedisExpireTimeoutMillis($expire); + $expireTimeoutMillis = $this->makeRedisExpireTimeoutMillis($expireTimeout); if ($this->isClientPHPRedis()) { try { @@ -125,6 +129,14 @@ protected function add(string $key, string $value, float $expire): bool } } + /** + * @param list $keys + * @param list $arguments + * + * @return mixed The script result, or false if executing failed + * + * @throws LockReleaseException An unexpected error happened + */ protected function evalScript(string $luaScript, array $keys, array $arguments) { if ($this->isClientPHPRedis()) { From c7f9487b0a32e666a01f05889f0822f4a3ff4726 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 02:43:17 +0100 Subject: [PATCH 04/22] more --- src/Mutex/DistributedMutex.php | 22 +++++++++++----------- tests/Mutex/MutexConcurrencyTest.php | 19 +++++++++++++++---- tests/Mutex/MutexTest.php | 19 +++++++++++++++---- tests/Mutex/RedisMutexWithPredisTest.php | 11 ++--------- 4 files changed, 43 insertions(+), 28 deletions(-) diff --git a/src/Mutex/DistributedMutex.php b/src/Mutex/DistributedMutex.php index 0388e19..fd0babf 100644 --- a/src/Mutex/DistributedMutex.php +++ b/src/Mutex/DistributedMutex.php @@ -21,21 +21,21 @@ class DistributedMutex extends AbstractSpinlockWithTokenMutex implements LoggerA use LoggerAwareTrait; /** @var array */ - private array $clients; + private array $mutexes; /** * The Redis instance needs to be connected. I.e. Redis::connect() was * called already. * - * @param array $clients + * @param array $mutexes * @param float $acquireTimeout In seconds * @param float $expireTimeout In seconds */ - public function __construct(array $clients, string $name, float $acquireTimeout = 3, float $expireTimeout = \INF) + public function __construct(array $mutexes, float $acquireTimeout = 3, float $expireTimeout = \INF) { - parent::__construct($name, $acquireTimeout, $expireTimeout); + parent::__construct('', $acquireTimeout, $expireTimeout); - $this->clients = $clients; + $this->mutexes = $mutexes; $this->logger = new NullLogger(); } @@ -51,9 +51,9 @@ protected function acquireWithToken(string $key, float $expireTimeout) $acquired = 0; $errored = 0; $exception = null; - foreach ($this->clients as $index => $client) { + foreach ($this->mutexes as $index => $mutex) { try { - if ($client->acquireWithToken($key, $expireTimeout)) { + if ($mutex->acquireWithToken($key, $expireTimeout)) { ++$acquired; } } catch (LockAcquireException $exception) { @@ -82,7 +82,7 @@ protected function acquireWithToken(string $key, float $expireTimeout) $this->releaseWithToken($key, $token); // In addition to RedLock it's an exception if too many servers fail. - if (!$this->isMajority(count($this->clients) - $errored)) { + if (!$this->isMajority(count($this->mutexes) - $errored)) { assert($exception !== null); // The last exception for some context. throw new LockAcquireException( @@ -99,9 +99,9 @@ protected function acquireWithToken(string $key, float $expireTimeout) protected function releaseWithToken(string $key, string $token): bool { $released = 0; - foreach ($this->clients as $index => $client) { + foreach ($this->mutexes as $index => $mutex) { try { - if ($client->releaseWithToken($key, $token)) { + if ($mutex->releaseWithToken($key, $token)) { ++$released; } } catch (LockReleaseException $e) { @@ -125,6 +125,6 @@ protected function releaseWithToken(string $key, string $token): bool */ private function isMajority(int $count): bool { - return $count > count($this->clients) / 2; + return $count > count($this->mutexes) / 2; } } diff --git a/tests/Mutex/MutexConcurrencyTest.php b/tests/Mutex/MutexConcurrencyTest.php index 0477eda..3d027f2 100644 --- a/tests/Mutex/MutexConcurrencyTest.php +++ b/tests/Mutex/MutexConcurrencyTest.php @@ -5,6 +5,7 @@ namespace Malkusch\Lock\Tests\Mutex; use Eloquent\Liberator\Liberator; +use Malkusch\Lock\Mutex\DistributedMutex; use Malkusch\Lock\Mutex\FlockMutex; use Malkusch\Lock\Mutex\MemcachedMutex; use Malkusch\Lock\Mutex\Mutex; @@ -204,17 +205,22 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable if (getenv('REDIS_URIS')) { $uris = explode(',', getenv('REDIS_URIS')); - yield 'RedisMutex /w Predis' => [static function ($timeout) use ($uris): Mutex { + yield 'DistributedMutex RedisMutex /w Predis' => [static function ($timeout) use ($uris): Mutex { $clients = array_map( static fn ($uri) => new PredisClient($uri), $uris ); - return new RedisMutex($clients, 'test', $timeout); + $mutexes = array_map( + static fn ($client) => new RedisMutex($client, 'test', $timeout), + $clients + ); + + return new DistributedMutex($mutexes, $timeout); }]; if (class_exists(\Redis::class)) { - yield 'RedisMutex /w PHPRedis' => [ + yield 'DistributedMutex RedisMutex /w PHPRedis' => [ static function ($timeout) use ($uris): Mutex { $clients = array_map( static function (string $uri): \Redis { @@ -235,7 +241,12 @@ static function (string $uri): \Redis { $uris ); - return new RedisMutex($clients, 'test', $timeout); + $mutexes = array_map( + static fn ($client) => new RedisMutex($client, 'test', $timeout), + $clients + ); + + return new DistributedMutex($mutexes, $timeout); }, ]; } diff --git a/tests/Mutex/MutexTest.php b/tests/Mutex/MutexTest.php index ca4eb18..dc33917 100644 --- a/tests/Mutex/MutexTest.php +++ b/tests/Mutex/MutexTest.php @@ -7,6 +7,7 @@ use Eloquent\Liberator\Liberator; use Malkusch\Lock\Mutex\AbstractLockMutex; use Malkusch\Lock\Mutex\AbstractSpinlockMutex; +use Malkusch\Lock\Mutex\DistributedMutex; use Malkusch\Lock\Mutex\FlockMutex; use Malkusch\Lock\Mutex\MemcachedMutex; use Malkusch\Lock\Mutex\Mutex; @@ -122,17 +123,22 @@ protected function release(string $key): bool if (getenv('REDIS_URIS')) { $uris = explode(',', getenv('REDIS_URIS')); - yield 'RedisMutex /w Predis' => [static function () use ($uris): Mutex { + yield 'DistributedMutex RedisMutex /w Predis' => [static function () use ($uris): Mutex { $clients = array_map( static fn ($uri) => new PredisClient($uri), $uris ); - return new RedisMutex($clients, 'test', self::TIMEOUT); + $mutexes = array_map( + static fn ($client) => new RedisMutex($client, 'test', self::TIMEOUT), + $clients + ); + + return new DistributedMutex($mutexes, self::TIMEOUT); }]; if (class_exists(\Redis::class)) { - yield 'RedisMutex /w PHPRedis' => [ + yield 'DistributedMutex RedisMutex /w PHPRedis' => [ static function () use ($uris): Mutex { $clients = array_map( static function ($uri) { @@ -153,7 +159,12 @@ static function ($uri) { $uris ); - return new RedisMutex($clients, 'test', self::TIMEOUT); + $mutexes = array_map( + static fn ($client) => new RedisMutex($client, 'test', self::TIMEOUT), + $clients + ); + + return new DistributedMutex($mutexes, self::TIMEOUT); }, ]; } diff --git a/tests/Mutex/RedisMutexWithPredisTest.php b/tests/Mutex/RedisMutexWithPredisTest.php index a69e509..79b011e 100644 --- a/tests/Mutex/RedisMutexWithPredisTest.php +++ b/tests/Mutex/RedisMutexWithPredisTest.php @@ -12,7 +12,6 @@ use PHPUnit\Framework\TestCase; use Predis\ClientInterface as PredisClientInterface; use Predis\PredisException; -use Psr\Log\LoggerInterface; interface PredisClientInterfaceWithSetAndEvalMethods extends PredisClientInterface { @@ -35,9 +34,6 @@ class RedisMutexWithPredisTest extends TestCase /** @var RedisMutex */ private $mutex; - /** @var LoggerInterface&MockObject */ - private $logger; - #[\Override] protected function setUp(): void { @@ -45,10 +41,7 @@ protected function setUp(): void $this->client = $this->createMock(PredisClientInterfaceWithSetAndEvalMethods::class); - $this->mutex = new RedisMutex([$this->client], 'test', 2.5, 3.5); - - $this->logger = $this->createMock(LoggerInterface::class); - $this->mutex->setLogger($this->logger); + $this->mutex = new RedisMutex($this->client, 'test', 2.5, 3.5); } /** @@ -119,7 +112,7 @@ public function testWorksNormally(): void public function testAcquireExpireTimeoutLimit(): void { - $this->mutex = new RedisMutex([$this->client], 'test'); + $this->mutex = new RedisMutex($this->client, 'test'); $this->client->expects(self::once()) ->method('set') From 787a371468f48409c4fd0839bbe6f027307dbbb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 02:51:42 +0100 Subject: [PATCH 05/22] more --- tests/Mutex/RedisMutexTest.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/Mutex/RedisMutexTest.php b/tests/Mutex/RedisMutexTest.php index eda1089..a6341e1 100644 --- a/tests/Mutex/RedisMutexTest.php +++ b/tests/Mutex/RedisMutexTest.php @@ -7,6 +7,7 @@ use Malkusch\Lock\Exception\LockAcquireException; use Malkusch\Lock\Exception\LockReleaseException; use Malkusch\Lock\Exception\MutexException; +use Malkusch\Lock\Mutex\DistributedMutex; use Malkusch\Lock\Mutex\RedisMutex; use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\Attributes\RequiresPhpExtension; @@ -146,7 +147,7 @@ private function _eval(string $script, array $args = [], int $numKeys = 0) $this->connections[] = $connection; } - $this->mutex = new RedisMutex($this->connections, 'test'); + $this->mutex = new DistributedMutex(array_map(static fn ($v) => new RedisMutex($v, 'test'), $this->connections)); } #[\Override] From 7bcf5f8957405a6c32bb83a67131fd1fcb528002 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 13:29:40 +0100 Subject: [PATCH 06/22] fix rebase --- phpstan.neon.dist | 5 ----- 1 file changed, 5 deletions(-) diff --git a/phpstan.neon.dist b/phpstan.neon.dist index 3152f90..756c97d 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -11,11 +11,6 @@ parameters: ignoreErrors: # TODO - - - path: 'src/Mutex/DistributedMutex.php' - identifier: if.condNotBoolean - message: '~^Only booleans are allowed in an if condition, mixed given\.$~' - count: 1 - path: 'tests/Mutex/*Test.php' identifier: empty.notAllowed From f2fd2f818c6fde92ea53cc767cdd99c9d72f23b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 15:12:34 +0100 Subject: [PATCH 07/22] working DistributedMutex impl --- src/Mutex/DistributedMutex.php | 64 +++++++++++++++++++++------- tests/Mutex/DistributedMutexTest.php | 55 ++++++++++++++---------- 2 files changed, 82 insertions(+), 37 deletions(-) diff --git a/src/Mutex/DistributedMutex.php b/src/Mutex/DistributedMutex.php index fd0babf..779493e 100644 --- a/src/Mutex/DistributedMutex.php +++ b/src/Mutex/DistributedMutex.php @@ -42,26 +42,25 @@ public function __construct(array $mutexes, float $acquireTimeout = 3, float $ex #[\Override] protected function acquireWithToken(string $key, float $expireTimeout) { - $token = LockUtil::getInstance()->makeRandomToken(); + $acquireTimeout = \Closure::bind(fn () => $this->acquireTimeout, $this, AbstractSpinlockMutex::class)(); // 1. This differs from the specification to avoid an overflow on 32-Bit systems. $startTs = microtime(true); // 2. - $acquired = 0; + $acquiredIndexes = []; $errored = 0; $exception = null; foreach ($this->mutexes as $index => $mutex) { try { - if ($mutex->acquireWithToken($key, $expireTimeout)) { - ++$acquired; + if ($this->acquireMutex($mutex, $key, $acquireTimeout, $expireTimeout)) { + $acquiredIndexes[] = $index; } } catch (LockAcquireException $exception) { // todo if there is only one redis server, throw immediately. $this->logger->warning('Could not set {key} = {token} at server #{index}', [ 'key' => $key, 'index' => $index, - 'token' => $token, 'exception' => $exception, ]); @@ -71,18 +70,20 @@ protected function acquireWithToken(string $key, float $expireTimeout) // 3. $elapsedTime = microtime(true) - $startTs; - $isAcquired = $this->isMajority($acquired) && $elapsedTime <= $expireTimeout; + $isAcquired = $this->isCountMajority(count($acquiredIndexes)) && $elapsedTime <= $expireTimeout; if ($isAcquired) { // 4. - return $token; + return LockUtil::getInstance()->makeRandomToken(); } // 5. - $this->releaseWithToken($key, $token); + foreach ($acquiredIndexes as $index) { + $this->releaseMutex($this->mutexes[$index], $key, $acquireTimeout); + } // In addition to RedLock it's an exception if too many servers fail. - if (!$this->isMajority(count($this->mutexes) - $errored)) { + if (!$this->isCountMajority(count($this->mutexes) - $errored)) { assert($exception !== null); // The last exception for some context. throw new LockAcquireException( @@ -98,10 +99,14 @@ protected function acquireWithToken(string $key, float $expireTimeout) #[\Override] protected function releaseWithToken(string $key, string $token): bool { + unset($token); + + $acquireTimeout = \Closure::bind(fn () => $this->acquireTimeout, $this, AbstractSpinlockMutex::class)(); + $released = 0; foreach ($this->mutexes as $index => $mutex) { try { - if ($mutex->releaseWithToken($key, $token)) { + if ($this->releaseMutex($mutex, $key, $acquireTimeout)) { ++$released; } } catch (LockReleaseException $e) { @@ -109,22 +114,51 @@ protected function releaseWithToken(string $key, string $token): bool $this->logger->warning('Could not unset {key} = {token} at server #{index}', [ 'key' => $key, 'index' => $index, - 'token' => $token, 'exception' => $e, ]); } } - return $this->isMajority($released); + return $this->isCountMajority($released); } /** - * Returns if a count is the majority of all servers. - * * @return bool True if the count is the majority */ - private function isMajority(int $count): bool + private function isCountMajority(int $count): bool { return $count > count($this->mutexes) / 2; } + + /** + * @template T + * + * @param \Closure(): T $fx + * + * @return T + */ + private function executeMutexWithAcquireTimeout(AbstractSpinlockWithTokenMutex $mutex, \Closure $fx, $acquireTimeout) + { + return \Closure::bind(static function () use ($mutex, $fx, $acquireTimeout) { + $origAcquireTimeout = $mutex->acquireTimeout; + if ($acquireTimeout < $mutex->acquireTimeout) { + $mutex->acquireTimeout = $acquireTimeout; + } + try { + return $fx(); + } finally { + $mutex->acquireTimeout = $origAcquireTimeout; + } + }, null, AbstractSpinlockMutex::class)(); + } + + protected function acquireMutex(AbstractSpinlockWithTokenMutex $mutex, string $key, float $acquireTimeout, float $expireTimeout): bool + { + return $this->executeMutexWithAcquireTimeout($mutex, static fn () => $mutex->acquireWithToken($key, $expireTimeout), $acquireTimeout); + } + + protected function releaseMutex(AbstractSpinlockWithTokenMutex $mutex, string $key, float $acquireTimeout): bool + { + return $this->executeMutexWithAcquireTimeout($mutex, static fn () => $mutex->release($key), $acquireTimeout); + } } diff --git a/tests/Mutex/DistributedMutexTest.php b/tests/Mutex/DistributedMutexTest.php index 161d425..14e6315 100644 --- a/tests/Mutex/DistributedMutexTest.php +++ b/tests/Mutex/DistributedMutexTest.php @@ -8,6 +8,7 @@ use Malkusch\Lock\Exception\LockAcquireTimeoutException; use Malkusch\Lock\Exception\LockReleaseException; use Malkusch\Lock\Exception\MutexException; +use Malkusch\Lock\Mutex\AbstractSpinlockWithTokenMutex; use Malkusch\Lock\Mutex\DistributedMutex; use Malkusch\Lock\Util\LockUtil; use phpmock\environment\SleepEnvironmentBuilder; @@ -47,21 +48,31 @@ protected function setUp(): void */ private function createDistributedMutexMock(int $count, float $acquireTimeout = 1, float $expireTimeout = \INF): DistributedMutex { - $clients = array_map( - static fn ($i) => new class($i) { - public int $i; - - public function __construct(int $i) - { - $this->i = $i; - } + $mutexes = array_map( + function (int $i) { + $mutex = $this->getMockBuilder(AbstractSpinlockWithTokenMutex::class) + ->setConstructorArgs(['test', \INF]) + ->onlyMethods(['acquireWithToken', 'releaseWithToken']) + ->getMock(); + + $mutex + ->method('acquireWithToken') + ->with(self::anything(), \INF) + ->willReturn('x' . $i); + + $mutex + ->method('releaseWithToken') + ->with(self::anything(), 'x' . $i) + ->willReturn(true); + + return $mutex; }, range(0, $count - 1) ); return $this->getMockBuilder(DistributedMutex::class) - ->setConstructorArgs([$clients, 'test', $acquireTimeout, $expireTimeout]) - ->onlyMethods(['add', 'evalScript']) + ->setConstructorArgs([$mutexes, $acquireTimeout, $expireTimeout]) + ->onlyMethods(['acquireMutex', 'releaseMutex']) ->getMock(); } @@ -83,7 +94,7 @@ public function testTooFewServerToAcquire(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) - ->method('add') + ->method('acquireMutex') ->willReturnCallback( static function () use (&$i, $available): bool { if ($i < $available) { @@ -114,12 +125,12 @@ public function testFaultTolerance(int $count, int $available): void { $mutex = $this->createDistributedMutexMock($count); $mutex->expects(self::exactly($count)) - ->method('evalScript') + ->method('releaseMutex') ->willReturn(true); $i = 0; $mutex->expects(self::exactly($count)) - ->method('add') + ->method('acquireMutex') ->willReturnCallback( static function () use (&$i, $available): bool { if ($i < $available) { @@ -153,7 +164,7 @@ public function testAcquireTooFewKeys(int $count, int $available): void $i = 0; $mutex->expects(self::any()) - ->method('add') + ->method('acquireMutex') ->willReturnCallback( static function () use (&$i, $available): bool { ++$i; @@ -184,11 +195,11 @@ public function testAcquireTimeouts(int $count, float $timeout, float $delay): v $mutex = $this->createDistributedMutexMock($count, $timeout, $timeout); $mutex->expects(self::exactly($count)) - ->method('evalScript') + ->method('releaseMutex') ->willReturn(true); $mutex->expects(self::exactly($count)) - ->method('add') + ->method('acquireMutex') ->willReturnCallback(static function () use ($delay): bool { usleep((int) ($delay * 1e6)); @@ -222,12 +233,12 @@ public function testAcquireWithMajority(int $count, int $available): void { $mutex = $this->createDistributedMutexMock($count); $mutex->expects(self::exactly($count)) - ->method('evalScript') + ->method('releaseMutex') ->willReturn(true); $i = 0; $mutex->expects(self::exactly($count)) - ->method('add') + ->method('acquireMutex') ->willReturnCallback( static function () use (&$i, $available): bool { ++$i; @@ -252,12 +263,12 @@ public function testTooFewServersToRelease(int $count, int $available): void { $mutex = $this->createDistributedMutexMock($count); $mutex->expects(self::exactly($count)) - ->method('add') + ->method('acquireMutex') ->willReturn(true); $i = 0; $mutex->expects(self::exactly($count)) - ->method('evalScript') + ->method('releaseMutex') ->willReturnCallback( static function () use (&$i, $available): bool { if ($i < $available) { @@ -288,12 +299,12 @@ public function testReleaseTooFewKeys(int $count, int $available): void { $mutex = $this->createDistributedMutexMock($count); $mutex->expects(self::exactly($count)) - ->method('add') + ->method('acquireMutex') ->willReturn(true); $i = 0; $mutex->expects(self::exactly($count)) - ->method('evalScript') + ->method('releaseMutex') ->willReturnCallback( static function () use (&$i, $available): bool { ++$i; From 722969d9a12a2c0682dcfaec1d6f4d27d98471ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 16:28:10 +0100 Subject: [PATCH 08/22] fix RedisMutexWithPredisTest - todo test the warnings in distributed test --- tests/Mutex/RedisMutexWithPredisTest.php | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/tests/Mutex/RedisMutexWithPredisTest.php b/tests/Mutex/RedisMutexWithPredisTest.php index 79b011e..e228371 100644 --- a/tests/Mutex/RedisMutexWithPredisTest.php +++ b/tests/Mutex/RedisMutexWithPredisTest.php @@ -54,9 +54,6 @@ public function testAddFailsToSetKey(): void ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 3501, 'NX') ->willReturn(null); - $this->logger->expects(self::never()) - ->method('warning'); - $this->expectException(LockAcquireException::class); $this->mutex->synchronized( @@ -76,10 +73,6 @@ public function testAddErrors(): void ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 3501, 'NX') ->willThrowException($this->createMock(PredisException::class)); - $this->logger->expects(self::once()) - ->method('warning') - ->with('Could not set {key} = {token} at server #{index}', self::anything()); - $this->expectException(LockAcquireException::class); $this->mutex->synchronized( @@ -102,7 +95,6 @@ public function testWorksNormally(): void ->willReturn(true); $executed = false; - $this->mutex->synchronized(static function () use (&$executed): void { $executed = true; }); @@ -142,14 +134,9 @@ public function testEvalScriptFails(): void ->with(self::anything(), 1, 'php-malkusch-lock:test', new IsType(IsType::TYPE_STRING)) ->willThrowException($this->createMock(PredisException::class)); - $this->logger->expects(self::once()) - ->method('warning') - ->with('Could not unset {key} = {token} at server #{index}', self::anything()); - - $executed = false; - $this->expectException(LockReleaseException::class); + $executed = false; $this->mutex->synchronized(static function () use (&$executed): void { $executed = true; }); From e5f744bc49b493bbe81ff571cd3cc79dcc6b2c86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 16:34:01 +0100 Subject: [PATCH 09/22] fix stan --- src/Mutex/DistributedMutex.php | 2 +- tests/Mutex/RedisMutexTest.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Mutex/DistributedMutex.php b/src/Mutex/DistributedMutex.php index 779493e..5d3678a 100644 --- a/src/Mutex/DistributedMutex.php +++ b/src/Mutex/DistributedMutex.php @@ -137,7 +137,7 @@ private function isCountMajority(int $count): bool * * @return T */ - private function executeMutexWithAcquireTimeout(AbstractSpinlockWithTokenMutex $mutex, \Closure $fx, $acquireTimeout) + private function executeMutexWithAcquireTimeout(AbstractSpinlockWithTokenMutex $mutex, \Closure $fx, float $acquireTimeout) { return \Closure::bind(static function () use ($mutex, $fx, $acquireTimeout) { $origAcquireTimeout = $mutex->acquireTimeout; diff --git a/tests/Mutex/RedisMutexTest.php b/tests/Mutex/RedisMutexTest.php index a6341e1..fc2e65f 100644 --- a/tests/Mutex/RedisMutexTest.php +++ b/tests/Mutex/RedisMutexTest.php @@ -147,7 +147,7 @@ private function _eval(string $script, array $args = [], int $numKeys = 0) $this->connections[] = $connection; } - $this->mutex = new DistributedMutex(array_map(static fn ($v) => new RedisMutex($v, 'test'), $this->connections)); + $this->mutex = new DistributedMutex(array_map(static fn ($v) => new RedisMutex($v, 'test'), $this->connections)); // @phpstan-ignore assign.propertyType } #[\Override] From cb086ec5ec29d3198fb7f7efa679310ab47aecc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 16:44:40 +0100 Subject: [PATCH 10/22] any AbstractSpinlockMutex support --- src/Mutex/DistributedMutex.php | 38 +++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/Mutex/DistributedMutex.php b/src/Mutex/DistributedMutex.php index 5d3678a..29888fa 100644 --- a/src/Mutex/DistributedMutex.php +++ b/src/Mutex/DistributedMutex.php @@ -20,16 +20,16 @@ class DistributedMutex extends AbstractSpinlockWithTokenMutex implements LoggerA { use LoggerAwareTrait; - /** @var array */ + /** @var array */ private array $mutexes; /** * The Redis instance needs to be connected. I.e. Redis::connect() was * called already. * - * @param array $mutexes - * @param float $acquireTimeout In seconds - * @param float $expireTimeout In seconds + * @param array $mutexes + * @param float $acquireTimeout In seconds + * @param float $expireTimeout In seconds */ public function __construct(array $mutexes, float $acquireTimeout = 3, float $expireTimeout = \INF) { @@ -79,7 +79,7 @@ protected function acquireWithToken(string $key, float $expireTimeout) // 5. foreach ($acquiredIndexes as $index) { - $this->releaseMutex($this->mutexes[$index], $key, $acquireTimeout); + $this->releaseMutex($this->mutexes[$index], $key, $expireTimeout); } // In addition to RedLock it's an exception if too many servers fail. @@ -101,12 +101,12 @@ protected function releaseWithToken(string $key, string $token): bool { unset($token); - $acquireTimeout = \Closure::bind(fn () => $this->acquireTimeout, $this, AbstractSpinlockMutex::class)(); + $expireTimeout = \Closure::bind(fn () => $this->expireTimeout, $this, parent::class)(); $released = 0; foreach ($this->mutexes as $index => $mutex) { try { - if ($this->releaseMutex($mutex, $key, $acquireTimeout)) { + if ($this->releaseMutex($mutex, $key, $expireTimeout)) { ++$released; } } catch (LockReleaseException $e) { @@ -137,8 +137,22 @@ private function isCountMajority(int $count): bool * * @return T */ - private function executeMutexWithAcquireTimeout(AbstractSpinlockWithTokenMutex $mutex, \Closure $fx, float $acquireTimeout) + private function executeMutexWithMinTimeouts(AbstractSpinlockMutex $mutex, \Closure $fx, float $acquireTimeout, float $expireTimeout) { + if ($mutex instanceof AbstractSpinlockWithTokenMutex) { + return \Closure::bind(static function () use ($mutex, $fx, $expireTimeout) { + $origExpireTimeout = $mutex->expireTimeout; + if ($expireTimeout < $mutex->expireTimeout) { + $mutex->expireTimeout = $expireTimeout; + } + try { + return $fx(); + } finally { + $mutex->expireTimeout = $origExpireTimeout; + } + }, null, parent::class)(); + } + return \Closure::bind(static function () use ($mutex, $fx, $acquireTimeout) { $origAcquireTimeout = $mutex->acquireTimeout; if ($acquireTimeout < $mutex->acquireTimeout) { @@ -152,13 +166,13 @@ private function executeMutexWithAcquireTimeout(AbstractSpinlockWithTokenMutex $ }, null, AbstractSpinlockMutex::class)(); } - protected function acquireMutex(AbstractSpinlockWithTokenMutex $mutex, string $key, float $acquireTimeout, float $expireTimeout): bool + protected function acquireMutex(AbstractSpinlockMutex $mutex, string $key, float $acquireTimeout, float $expireTimeout): bool { - return $this->executeMutexWithAcquireTimeout($mutex, static fn () => $mutex->acquireWithToken($key, $expireTimeout), $acquireTimeout); + return $this->executeMutexWithMinTimeouts($mutex, static fn () => $mutex->acquire($key), $acquireTimeout, $expireTimeout); } - protected function releaseMutex(AbstractSpinlockWithTokenMutex $mutex, string $key, float $acquireTimeout): bool + protected function releaseMutex(AbstractSpinlockMutex $mutex, string $key, float $expireTimeout): bool { - return $this->executeMutexWithAcquireTimeout($mutex, static fn () => $mutex->release($key), $acquireTimeout); + return $this->executeMutexWithMinTimeouts($mutex, static fn () => $mutex->release($key), \INF, $expireTimeout); } } From 49b4b8a7a8d4ec9d70e54ba510c61faf3553e249 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 17:05:54 +0100 Subject: [PATCH 11/22] fix redis strict types --- src/Mutex/RedisMutex.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Mutex/RedisMutex.php b/src/Mutex/RedisMutex.php index 765d31d..f15565d 100644 --- a/src/Mutex/RedisMutex.php +++ b/src/Mutex/RedisMutex.php @@ -67,7 +67,7 @@ protected function releaseWithToken(string $key, string $token): bool end LUA; - return $this->evalScript($script, [$key], [$token]); + return (int) $this->evalScript($script, [$key], [$token]) === 1; } private function makeRedisExpireTimeoutMillis(float $value): int From c94397c79fd713b7feefec47f44d84942c1796d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 17:08:56 +0100 Subject: [PATCH 12/22] improve SemaphoreMutex exception messages --- src/Mutex/SemaphoreMutex.php | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Mutex/SemaphoreMutex.php b/src/Mutex/SemaphoreMutex.php index a1f4bec..9feb240 100644 --- a/src/Mutex/SemaphoreMutex.php +++ b/src/Mutex/SemaphoreMutex.php @@ -29,8 +29,9 @@ class SemaphoreMutex extends AbstractLockMutex public function __construct($semaphore) { if (!$semaphore instanceof \SysvSemaphore && !is_resource($semaphore)) { - throw new \InvalidArgumentException('The semaphore id is not a valid resource'); + throw new \InvalidArgumentException('Invalid System V semaphore'); } + $this->semaphore = $semaphore; } @@ -38,7 +39,7 @@ public function __construct($semaphore) protected function lock(): void { if (!sem_acquire($this->semaphore)) { - throw new LockAcquireException('Failed to acquire the Semaphore'); + throw new LockAcquireException('Failed to acquire a System V semaphore'); } } @@ -46,7 +47,7 @@ protected function lock(): void protected function unlock(): void { if (!sem_release($this->semaphore)) { - throw new LockReleaseException('Failed to release the Semaphore'); + throw new LockReleaseException('Failed to release the System V semaphore'); } } } From 10064742398b47be6c051db715ef3148f9ced0f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 18:27:08 +0100 Subject: [PATCH 13/22] fix dist mutex release --- src/Mutex/DistributedMutex.php | 37 ++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/Mutex/DistributedMutex.php b/src/Mutex/DistributedMutex.php index 29888fa..f182ddd 100644 --- a/src/Mutex/DistributedMutex.php +++ b/src/Mutex/DistributedMutex.php @@ -23,6 +23,9 @@ class DistributedMutex extends AbstractSpinlockWithTokenMutex implements LoggerA /** @var array */ private array $mutexes; + /** @var list */ + private ?array $lockedMutexIndexes = null; + /** * The Redis instance needs to be connected. I.e. Redis::connect() was * called already. @@ -57,7 +60,6 @@ protected function acquireWithToken(string $key, float $expireTimeout) $acquiredIndexes[] = $index; } } catch (LockAcquireException $exception) { - // todo if there is only one redis server, throw immediately. $this->logger->warning('Could not set {key} = {token} at server #{index}', [ 'key' => $key, 'index' => $index, @@ -73,6 +75,8 @@ protected function acquireWithToken(string $key, float $expireTimeout) $isAcquired = $this->isCountMajority(count($acquiredIndexes)) && $elapsedTime <= $expireTimeout; if ($isAcquired) { + $this->lockedMutexIndexes = $acquiredIndexes; + // 4. return LockUtil::getInstance()->makeRandomToken(); } @@ -103,23 +107,26 @@ protected function releaseWithToken(string $key, string $token): bool $expireTimeout = \Closure::bind(fn () => $this->expireTimeout, $this, parent::class)(); - $released = 0; - foreach ($this->mutexes as $index => $mutex) { - try { - if ($this->releaseMutex($mutex, $key, $expireTimeout)) { - ++$released; + try { + $released = 0; + foreach ($this->lockedMutexIndexes as $index) { + try { + if ($this->releaseMutex($this->mutexes[$index], $key, $expireTimeout)) { + ++$released; + } + } catch (LockReleaseException $e) { + $this->logger->warning('Could not unset {key} = {token} at server #{index}', [ + 'key' => $key, + 'index' => $index, + 'exception' => $e, + ]); } - } catch (LockReleaseException $e) { - // todo throw if there is only one redis server - $this->logger->warning('Could not unset {key} = {token} at server #{index}', [ - 'key' => $key, - 'index' => $index, - 'exception' => $e, - ]); } - } - return $this->isCountMajority($released); + return $this->isCountMajority($released); + } finally { + $this->lockedMutexIndexes = null; + } } /** From cb0870926c96e38f4982601b3edff59302564614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 18:37:08 +0100 Subject: [PATCH 14/22] adjust tests - dist lock fails earlier --- src/Mutex/DistributedMutex.php | 2 +- tests/Mutex/DistributedMutexTest.php | 28 ++++++++-------------------- 2 files changed, 9 insertions(+), 21 deletions(-) diff --git a/src/Mutex/DistributedMutex.php b/src/Mutex/DistributedMutex.php index f182ddd..c9b2ab1 100644 --- a/src/Mutex/DistributedMutex.php +++ b/src/Mutex/DistributedMutex.php @@ -56,7 +56,7 @@ protected function acquireWithToken(string $key, float $expireTimeout) $exception = null; foreach ($this->mutexes as $index => $mutex) { try { - if ($this->acquireMutex($mutex, $key, $acquireTimeout, $expireTimeout)) { + if ($this->acquireMutex($mutex, $key, $acquireTimeout - (microtime(true) - $startTs), $expireTimeout)) { $acquiredIndexes[] = $index; } } catch (LockAcquireException $exception) { diff --git a/tests/Mutex/DistributedMutexTest.php b/tests/Mutex/DistributedMutexTest.php index 14e6315..d0a2904 100644 --- a/tests/Mutex/DistributedMutexTest.php +++ b/tests/Mutex/DistributedMutexTest.php @@ -97,9 +97,7 @@ public function testTooFewServerToAcquire(int $count, int $available): void ->method('acquireMutex') ->willReturnCallback( static function () use (&$i, $available): bool { - if ($i < $available) { - ++$i; - + if ($i++ < $available) { return true; } @@ -124,7 +122,7 @@ static function () use (&$i, $available): bool { public function testFaultTolerance(int $count, int $available): void { $mutex = $this->createDistributedMutexMock($count); - $mutex->expects(self::exactly($count)) + $mutex->expects(self::exactly($available)) ->method('releaseMutex') ->willReturn(true); @@ -133,9 +131,7 @@ public function testFaultTolerance(int $count, int $available): void ->method('acquireMutex') ->willReturnCallback( static function () use (&$i, $available): bool { - if ($i < $available) { - ++$i; - + if ($i++ < $available) { return true; } @@ -167,9 +163,7 @@ public function testAcquireTooFewKeys(int $count, int $available): void ->method('acquireMutex') ->willReturnCallback( static function () use (&$i, $available): bool { - ++$i; - - return $i <= $available; + return ++$i <= $available; } ); @@ -232,7 +226,7 @@ public static function provideAcquireTimeoutsCases(): iterable public function testAcquireWithMajority(int $count, int $available): void { $mutex = $this->createDistributedMutexMock($count); - $mutex->expects(self::exactly($count)) + $mutex->expects(self::exactly($available)) ->method('releaseMutex') ->willReturn(true); @@ -241,9 +235,7 @@ public function testAcquireWithMajority(int $count, int $available): void ->method('acquireMutex') ->willReturnCallback( static function () use (&$i, $available): bool { - ++$i; - - return $i <= $available; + return ++$i <= $available; } ); @@ -271,9 +263,7 @@ public function testTooFewServersToRelease(int $count, int $available): void ->method('releaseMutex') ->willReturnCallback( static function () use (&$i, $available): bool { - if ($i < $available) { - ++$i; - + if ($i++ < $available) { return true; } @@ -307,9 +297,7 @@ public function testReleaseTooFewKeys(int $count, int $available): void ->method('releaseMutex') ->willReturnCallback( static function () use (&$i, $available): bool { - ++$i; - - return $i <= $available; + return ++$i <= $available; } ); From 02f84bfa6f7a35a446c96a99d0e38ac63aea3737 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 21:38:43 +0100 Subject: [PATCH 15/22] add distributed lock to README and simplify it --- README.md | 85 ++++++++++++++++--------------------------------------- 1 file changed, 24 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index 5611e34..4d63eec 100644 --- a/README.md +++ b/README.md @@ -65,10 +65,7 @@ return value `false` or `null` should be seen as a failed action. Example: ```php -$newBalance = $mutex->synchronized(function () use ( - $bankAccount, - $amount -): int { +$newBalance = $mutex->synchronized(static function () use ($bankAccount, $amount) { $balance = $bankAccount->getBalance(); $balance -= $amount; if ($balance < 0) { @@ -108,9 +105,9 @@ this return value will not be checked by the library. Example: ```php -$newBalance = $mutex->check(function () use ($bankAccount, $amount): bool { +$newBalance = $mutex->check(static function () use ($bankAccount, $amount): bool { return $bankAccount->getBalance() >= $amount; -})->then(function () use ($bankAccount, $amount): int { +})->then(static function () use ($bankAccount, $amount) { $balance = $bankAccount->getBalance(); $balance -= $amount; $bankAccount->setBalance($balance); @@ -118,7 +115,7 @@ $newBalance = $mutex->check(function () use ($bankAccount, $amount): bool { return $balance; }); -if ($newBalance === false) { +if (!$newBalance) { if ($balance < 0) { throw new \DomainException('You have no credit'); } @@ -136,8 +133,8 @@ In order to read the code result (or an exception thrown there), Example: ```php try { - // or $mutex->check(...) - $result = $mutex->synchronized(function () { + // OR $mutex->check(...) + $result = $mutex->synchronized(static function () { if (someCondition()) { throw new \DomainException(); } @@ -149,7 +146,7 @@ try { $codeException = $unlockException->getCodeException(); // do something with the code exception } else { - $code_result = $unlockException->getCodeResult(); + $codeResult = $unlockException->getCodeResult(); // do something with the code result } @@ -168,7 +165,8 @@ implementations or create/extend your own implementation. - [`RedisMutex`](#redismutex) - [`SemaphoreMutex`](#semaphoremutex) - [`MySQLMutex`](#mysqlmutex) -- [`PostgreSQLMutex`](#PostgreSQLMutex) +- [`PostgreSQLMutex`](#postgresqlmutex) +- [`DistributedMutex`](#distributedmutex) #### FlockMutex @@ -178,14 +176,6 @@ The **FlockMutex** is a lock implementation based on Example: ```php $mutex = new FlockMutex(fopen(__FILE__, 'r')); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); ``` Timeouts are supported as an optional second argument. This uses the `ext-pcntl` @@ -202,14 +192,6 @@ $memcached = new \Memcached(); $memcached->addServer('localhost', 11211); $mutex = new MemcachedMutex('balance', $memcached); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); ``` #### RedisMutex @@ -231,14 +213,6 @@ $redis->connect('localhost'); // OR $redis = new \Predis\Client('redis://localhost'); $mutex = new RedisMutex([$redis], 'balance'); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); ``` #### SemaphoreMutex @@ -250,14 +224,6 @@ Example: ```php $semaphore = sem_get(ftok(__FILE__, 'a')); $mutex = new SemaphoreMutex($semaphore); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); ``` #### MySQLMutex @@ -280,16 +246,7 @@ you to namespace your locks like `dbname.lockname`. ```php $pdo = new \PDO('mysql:host=localhost;dbname=test', 'username'); - $mutex = new MySQLMutex($pdo, 'balance', 15); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); ``` #### PostgreSQLMutex @@ -306,16 +263,21 @@ interrupted, the lock is automatically released. ```php $pdo = new \PDO('pgsql:host=localhost;dbname=test', 'username'); - $mutex = new PostgreSQLMutex($pdo, 'balance'); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); +``` + +#### DistributedMutex + +The **DistributedMutex** is the distributed lock implementation of +[RedLock](https://redis.io/topics/distlock#the-redlock-algorithm) which supports +one or more [`Malkush\Lock\Mutex\AbstractSpinlockMutex`][10] instances. + +Example: +```php +$mutex = new DistributedMutex([ + new \Predis\Client('redis://10.0.0.1'), + new \Predis\Client('redis://10.0.0.2'), +], 'balance'); ``` ## Authors @@ -341,3 +303,4 @@ This project is free and is licensed under the MIT. [9]: https://en.wikipedia.org/wiki/Double-checked_locking [10]: https://github.com/php-lock/lock/blob/3ca295ccda/src/Mutex/AbstractLockMutex.php [11]: https://github.com/php-lock/lock/blob/3ca295ccda/src/Exception/LockReleaseException.php +[12]: https://github.com/php-lock/lock/blob/41509dda0a/src/Mutex/AbstractSpinlockMutex.php#L15 From e52ffb5ea735da9922c3df7db5b63f33f6db4d1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 21:39:23 +0100 Subject: [PATCH 16/22] improve cs --- src/Mutex/FlockMutex.php | 8 ++- tests/Mutex/DistributedMutexTest.php | 66 ++++++++++-------------- tests/Mutex/FlockMutexTest.php | 18 +++---- tests/Mutex/RedisMutexTest.php | 9 ++-- tests/Mutex/RedisMutexWithPredisTest.php | 16 +++--- 5 files changed, 48 insertions(+), 69 deletions(-) diff --git a/src/Mutex/FlockMutex.php b/src/Mutex/FlockMutex.php index 6aef156..f04b309 100644 --- a/src/Mutex/FlockMutex.php +++ b/src/Mutex/FlockMutex.php @@ -74,11 +74,9 @@ private function lockPcntl(): void $timebox = new PcntlTimeout($acquireTimeoutInt); try { - $timebox->timeBoxed( - function (): void { - $this->lockBlocking(); - } - ); + $timebox->timeBoxed(function (): void { + $this->lockBlocking(); + }); } catch (DeadlineException $e) { throw LockAcquireTimeoutException::create($acquireTimeoutInt); } diff --git a/tests/Mutex/DistributedMutexTest.php b/tests/Mutex/DistributedMutexTest.php index d0a2904..2a618a3 100644 --- a/tests/Mutex/DistributedMutexTest.php +++ b/tests/Mutex/DistributedMutexTest.php @@ -95,15 +95,13 @@ public function testTooFewServerToAcquire(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) ->method('acquireMutex') - ->willReturnCallback( - static function () use (&$i, $available): bool { - if ($i++ < $available) { - return true; - } - - throw new LockAcquireException(); + ->willReturnCallback(static function () use (&$i, $available): bool { + if ($i++ < $available) { + return true; } - ); + + throw new LockAcquireException(); + }); $mutex->synchronized(static function (): void { self::fail(); @@ -129,15 +127,13 @@ public function testFaultTolerance(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) ->method('acquireMutex') - ->willReturnCallback( - static function () use (&$i, $available): bool { - if ($i++ < $available) { - return true; - } - - throw new LockAcquireException(); + ->willReturnCallback(static function () use (&$i, $available): bool { + if ($i++ < $available) { + return true; } - ); + + throw new LockAcquireException(); + }); $mutex->synchronized(static function () {}); } @@ -161,11 +157,9 @@ public function testAcquireTooFewKeys(int $count, int $available): void $i = 0; $mutex->expects(self::any()) ->method('acquireMutex') - ->willReturnCallback( - static function () use (&$i, $available): bool { - return ++$i <= $available; - } - ); + ->willReturnCallback(static function () use (&$i, $available): bool { + return ++$i <= $available; + }); $mutex->synchronized(static function (): void { self::fail(); @@ -233,11 +227,9 @@ public function testAcquireWithMajority(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) ->method('acquireMutex') - ->willReturnCallback( - static function () use (&$i, $available): bool { - return ++$i <= $available; - } - ); + ->willReturnCallback(static function () use (&$i, $available): bool { + return ++$i <= $available; + }); $mutex->synchronized(static function (): void {}); } @@ -261,15 +253,13 @@ public function testTooFewServersToRelease(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) ->method('releaseMutex') - ->willReturnCallback( - static function () use (&$i, $available): bool { - if ($i++ < $available) { - return true; - } - - throw new LockReleaseException(); + ->willReturnCallback(static function () use (&$i, $available): bool { + if ($i++ < $available) { + return true; } - ); + + throw new LockReleaseException(); + }); $this->expectException(LockReleaseException::class); @@ -295,11 +285,9 @@ public function testReleaseTooFewKeys(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) ->method('releaseMutex') - ->willReturnCallback( - static function () use (&$i, $available): bool { - return ++$i <= $available; - } - ); + ->willReturnCallback(static function () use (&$i, $available): bool { + return ++$i <= $available; + }); $this->expectException(LockReleaseException::class); diff --git a/tests/Mutex/FlockMutexTest.php b/tests/Mutex/FlockMutexTest.php index 63c7043..96d5e50 100644 --- a/tests/Mutex/FlockMutexTest.php +++ b/tests/Mutex/FlockMutexTest.php @@ -67,19 +67,17 @@ public function testAcquireTimeoutOccurs(string $strategy): void $this->expectException(LockAcquireTimeoutException::class); $this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded'); - $another_resource = fopen($this->file, 'r'); - flock($another_resource, \LOCK_EX); + $anotherResource = fopen($this->file, 'r'); + flock($anotherResource, \LOCK_EX); $this->mutex->strategy = $strategy; // @phpstan-ignore property.private try { - $this->mutex->synchronized( - static function () { - self::fail(); - } - ); + $this->mutex->synchronized(static function () { + self::fail(); + }); } finally { - fclose($another_resource); + fclose($anotherResource); } } @@ -103,8 +101,8 @@ public function testNoTimeoutWaitsForever(): void { $this->expectException(DeadlineException::class); - $another_resource = fopen($this->file, 'r'); - flock($another_resource, \LOCK_EX); + $anotherResource = fopen($this->file, 'r'); + flock($anotherResource, \LOCK_EX); $this->mutex->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_BLOCK, null, FlockMutex::class)(); // @phpstan-ignore property.private diff --git a/tests/Mutex/RedisMutexTest.php b/tests/Mutex/RedisMutexTest.php index fc2e65f..45a84d3 100644 --- a/tests/Mutex/RedisMutexTest.php +++ b/tests/Mutex/RedisMutexTest.php @@ -91,14 +91,13 @@ protected function setUp(): void $connection = new class extends \Redis { use RedisCompatibilityTrait; - /** @var bool */ - private $is_closed = false; + private bool $isClosed = false; #[\Override] public function close(): bool { $res = parent::close(); - $this->is_closed = true; + $this->isClosed = true; return $res; } @@ -111,7 +110,7 @@ public function close(): bool */ private function _set(string $key, $value, $timeout = 0) { - if ($this->is_closed) { + if ($this->isClosed) { throw new \RedisException('Connection is closed'); } @@ -125,7 +124,7 @@ private function _set(string $key, $value, $timeout = 0) */ private function _eval(string $script, array $args = [], int $numKeys = 0) { - if ($this->is_closed) { + if ($this->isClosed) { throw new \RedisException('Connection is closed'); } diff --git a/tests/Mutex/RedisMutexWithPredisTest.php b/tests/Mutex/RedisMutexWithPredisTest.php index e228371..70a1b7f 100644 --- a/tests/Mutex/RedisMutexWithPredisTest.php +++ b/tests/Mutex/RedisMutexWithPredisTest.php @@ -56,11 +56,9 @@ public function testAddFailsToSetKey(): void $this->expectException(LockAcquireException::class); - $this->mutex->synchronized( - static function (): void { - self::fail(); - } - ); + $this->mutex->synchronized(static function (): void { + self::fail(); + }); } /** @@ -75,11 +73,9 @@ public function testAddErrors(): void $this->expectException(LockAcquireException::class); - $this->mutex->synchronized( - static function () { - self::fail(); - } - ); + $this->mutex->synchronized(static function () { + self::fail(); + }); } public function testWorksNormally(): void From 4483c66500017843edd34cf00709494bfba041b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 21:41:52 +0100 Subject: [PATCH 17/22] adjust RedisMutex README --- README.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 4d63eec..0d813ec 100644 --- a/README.md +++ b/README.md @@ -196,10 +196,9 @@ $mutex = new MemcachedMutex('balance', $memcached); #### RedisMutex -The **RedisMutex** is the distributed lock implementation of -[RedLock](https://redis.io/topics/distlock#the-redlock-algorithm) which supports the +The **RedisMutex** is a lock implementation which supports the [`phpredis` extension](https://github.com/phpredis/phpredis) -or [`Predis` API](https://github.com/nrk/predis). +or [`Predis` API](https://github.com/nrk/predis) clients. Both Redis and Valkey servers are supported. @@ -212,7 +211,7 @@ $redis = new \Redis(); $redis->connect('localhost'); // OR $redis = new \Predis\Client('redis://localhost'); -$mutex = new RedisMutex([$redis], 'balance'); +$mutex = new RedisMutex($redis, 'balance'); ``` #### SemaphoreMutex From 474d256fd0bff2af47bb4b5f1c0eb9983cf64f4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 21:58:07 +0100 Subject: [PATCH 18/22] simplify tests cs --- tests/Mutex/AbstractLockMutexTest.php | 4 ++-- tests/Mutex/DistributedMutexTest.php | 26 ++++++++++---------- tests/Mutex/FlockMutexTest.php | 4 ++-- tests/Mutex/MemcachedMutexTest.php | 6 ++--- tests/Mutex/MutexConcurrencyTest.php | 30 ++++++++++++------------ tests/Mutex/MutexTest.php | 26 ++++++++++---------- tests/Mutex/PostgreSQLMutexTest.php | 4 ++-- tests/Mutex/RedisMutexTest.php | 8 +++---- tests/Mutex/RedisMutexWithPredisTest.php | 8 +++---- tests/Util/DoubleCheckedLockingTest.php | 22 +++++++---------- tests/Util/LoopTest.php | 12 +++++----- 11 files changed, 73 insertions(+), 77 deletions(-) diff --git a/tests/Mutex/AbstractLockMutexTest.php b/tests/Mutex/AbstractLockMutexTest.php index f3ed7d3..731c51f 100644 --- a/tests/Mutex/AbstractLockMutexTest.php +++ b/tests/Mutex/AbstractLockMutexTest.php @@ -36,7 +36,7 @@ public function testLockFails(): void ->method('lock') ->willThrowException(new LockAcquireException()); - $this->mutex->synchronized(static function (): void { + $this->mutex->synchronized(static function () { self::fail(); }); } @@ -49,7 +49,7 @@ public function testUnlockAfterCode(): void $this->mutex->expects(self::once()) ->method('unlock'); - $this->mutex->synchronized(static function (): void {}); + $this->mutex->synchronized(static function () {}); } /** diff --git a/tests/Mutex/DistributedMutexTest.php b/tests/Mutex/DistributedMutexTest.php index 2a618a3..3374136 100644 --- a/tests/Mutex/DistributedMutexTest.php +++ b/tests/Mutex/DistributedMutexTest.php @@ -95,7 +95,7 @@ public function testTooFewServerToAcquire(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) ->method('acquireMutex') - ->willReturnCallback(static function () use (&$i, $available): bool { + ->willReturnCallback(static function () use (&$i, $available) { if ($i++ < $available) { return true; } @@ -103,7 +103,7 @@ public function testTooFewServerToAcquire(int $count, int $available): void throw new LockAcquireException(); }); - $mutex->synchronized(static function (): void { + $mutex->synchronized(static function () { self::fail(); }); } @@ -127,7 +127,7 @@ public function testFaultTolerance(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) ->method('acquireMutex') - ->willReturnCallback(static function () use (&$i, $available): bool { + ->willReturnCallback(static function () use (&$i, $available) { if ($i++ < $available) { return true; } @@ -157,11 +157,11 @@ public function testAcquireTooFewKeys(int $count, int $available): void $i = 0; $mutex->expects(self::any()) ->method('acquireMutex') - ->willReturnCallback(static function () use (&$i, $available): bool { + ->willReturnCallback(static function () use (&$i, $available) { return ++$i <= $available; }); - $mutex->synchronized(static function (): void { + $mutex->synchronized(static function () { self::fail(); }); } @@ -188,13 +188,13 @@ public function testAcquireTimeouts(int $count, float $timeout, float $delay): v $mutex->expects(self::exactly($count)) ->method('acquireMutex') - ->willReturnCallback(static function () use ($delay): bool { + ->willReturnCallback(static function () use ($delay) { usleep((int) ($delay * 1e6)); return true; }); - $mutex->synchronized(static function (): void { + $mutex->synchronized(static function () { self::fail(); }); } @@ -227,11 +227,11 @@ public function testAcquireWithMajority(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) ->method('acquireMutex') - ->willReturnCallback(static function () use (&$i, $available): bool { + ->willReturnCallback(static function () use (&$i, $available) { return ++$i <= $available; }); - $mutex->synchronized(static function (): void {}); + $mutex->synchronized(static function () {}); } /** @@ -253,7 +253,7 @@ public function testTooFewServersToRelease(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) ->method('releaseMutex') - ->willReturnCallback(static function () use (&$i, $available): bool { + ->willReturnCallback(static function () use (&$i, $available) { if ($i++ < $available) { return true; } @@ -263,7 +263,7 @@ public function testTooFewServersToRelease(int $count, int $available): void $this->expectException(LockReleaseException::class); - $mutex->synchronized(static function (): void {}); + $mutex->synchronized(static function () {}); } /** @@ -285,13 +285,13 @@ public function testReleaseTooFewKeys(int $count, int $available): void $i = 0; $mutex->expects(self::exactly($count)) ->method('releaseMutex') - ->willReturnCallback(static function () use (&$i, $available): bool { + ->willReturnCallback(static function () use (&$i, $available) { return ++$i <= $available; }); $this->expectException(LockReleaseException::class); - $mutex->synchronized(static function (): void {}); + $mutex->synchronized(static function () {}); } /** diff --git a/tests/Mutex/FlockMutexTest.php b/tests/Mutex/FlockMutexTest.php index 96d5e50..06758d4 100644 --- a/tests/Mutex/FlockMutexTest.php +++ b/tests/Mutex/FlockMutexTest.php @@ -49,7 +49,7 @@ public function testCodeExecutedOutsideLockIsNotThrown(string $strategy): void { $this->mutex->strategy = $strategy; // @phpstan-ignore property.private - self::assertTrue($this->mutex->synchronized(static function (): bool { // @phpstan-ignore staticMethod.alreadyNarrowedType + self::assertTrue($this->mutex->synchronized(static function () { // @phpstan-ignore staticMethod.alreadyNarrowedType usleep(1100 * 1000); return true; @@ -108,7 +108,7 @@ public function testNoTimeoutWaitsForever(): void $timebox = new PcntlTimeout(1); $timebox->timeBoxed(function () { - $this->mutex->synchronized(static function (): void { + $this->mutex->synchronized(static function () { self::fail(); }); }); diff --git a/tests/Mutex/MemcachedMutexTest.php b/tests/Mutex/MemcachedMutexTest.php index c79e1b2..3cc94ff 100644 --- a/tests/Mutex/MemcachedMutexTest.php +++ b/tests/Mutex/MemcachedMutexTest.php @@ -43,7 +43,7 @@ public function testAcquireFail(): void ->with('php-malkusch-lock:test', true, 3) ->willReturn(false); - $this->mutex->synchronized(static function (): void { + $this->mutex->synchronized(static function () { self::fail(); }); } @@ -62,7 +62,7 @@ public function testReleaseFail(): void ->with('php-malkusch-lock:test') ->willReturn(false); - $this->mutex->synchronized(static function (): void {}); + $this->mutex->synchronized(static function () {}); } public function testAcquireExpireTimeoutLimit(): void @@ -79,6 +79,6 @@ public function testAcquireExpireTimeoutLimit(): void ->with('php-malkusch-lock:test') ->willReturn(true); - $this->mutex->synchronized(static function (): void {}); + $this->mutex->synchronized(static function () {}); } } diff --git a/tests/Mutex/MutexConcurrencyTest.php b/tests/Mutex/MutexConcurrencyTest.php index 3d027f2..76a39e9 100644 --- a/tests/Mutex/MutexConcurrencyTest.php +++ b/tests/Mutex/MutexConcurrencyTest.php @@ -80,10 +80,10 @@ public function testHighContention(\Closure $code, \Closure $mutexFactory, ?\Clo $iterations = 1000 / $concurrency; $timeout = $concurrency * 20; - $this->fork($concurrency, static function () use ($mutexFactory, $timeout, $iterations, $code): void { + $this->fork($concurrency, static function () use ($mutexFactory, $timeout, $iterations, $code) { $mutex = $mutexFactory($timeout); for ($i = 0; $i < $iterations; ++$i) { - $mutex->synchronized(static function () use ($code): void { + $mutex->synchronized(static function () use ($code) { $code(1); }); } @@ -104,7 +104,7 @@ public static function provideHighContentionCases(): iterable static::$temporaryFiles[] = $filename; yield $name => [ - static function (int $increment) use ($filename): int { + static function (int $increment) use ($filename) { $counter = file_get_contents($filename); $counter += $increment; @@ -113,7 +113,7 @@ static function (int $increment) use ($filename): int { return $counter; }, $mutexFactory, - static function () use ($filename): void { + static function () use ($filename) { file_put_contents($filename, '0'); }, ]; @@ -132,9 +132,9 @@ public function testExecutionIsSerializedWhenLocked(\Closure $mutexFactory): voi { $time = \microtime(true); - $this->fork(6, static function () use ($mutexFactory): void { + $this->fork(6, static function () use ($mutexFactory) { $mutex = $mutexFactory(3); - $mutex->synchronized(static function (): void { + $mutex->synchronized(static function () { \usleep(200 * 1000); }); }); @@ -154,14 +154,14 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable self::$temporaryFiles[] = $filename; - yield 'flock' => [static function ($timeout) use ($filename): Mutex { + yield 'flock' => [static function ($timeout) use ($filename) { $file = fopen($filename, 'w'); return new FlockMutex($file, $timeout); }]; if (extension_loaded('pcntl')) { - yield 'flockWithTimoutPcntl' => [static function ($timeout) use ($filename): Mutex { + yield 'flockWithTimoutPcntl' => [static function ($timeout) use ($filename) { $file = fopen($filename, 'w'); $lock = Liberator::liberate(new FlockMutex($file, $timeout)); $lock->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_PCNTL, null, FlockMutex::class)(); // @phpstan-ignore property.notFound @@ -170,7 +170,7 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable }]; } - yield 'flockWithTimoutLoop' => [static function ($timeout) use ($filename): Mutex { + yield 'flockWithTimoutLoop' => [static function ($timeout) use ($filename) { $file = fopen($filename, 'w'); $lock = Liberator::liberate(new FlockMutex($file, $timeout)); $lock->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_LOOP, null, FlockMutex::class)(); // @phpstan-ignore property.notFound @@ -179,7 +179,7 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable }]; if (extension_loaded('sysvsem')) { - yield 'semaphore' => [static function () use ($filename): Mutex { + yield 'semaphore' => [static function () use ($filename) { $semaphore = sem_get(ftok($filename, 'b')); self::assertThat( $semaphore, @@ -194,7 +194,7 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable } if (getenv('MEMCACHE_HOST')) { - yield 'memcached' => [static function ($timeout): Mutex { + yield 'memcached' => [static function ($timeout) { $memcached = new \Memcached(); $memcached->addServer(getenv('MEMCACHE_HOST'), 11211); @@ -205,7 +205,7 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable if (getenv('REDIS_URIS')) { $uris = explode(',', getenv('REDIS_URIS')); - yield 'DistributedMutex RedisMutex /w Predis' => [static function ($timeout) use ($uris): Mutex { + yield 'DistributedMutex RedisMutex /w Predis' => [static function ($timeout) use ($uris) { $clients = array_map( static fn ($uri) => new PredisClient($uri), $uris @@ -221,7 +221,7 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable if (class_exists(\Redis::class)) { yield 'DistributedMutex RedisMutex /w PHPRedis' => [ - static function ($timeout) use ($uris): Mutex { + static function ($timeout) use ($uris) { $clients = array_map( static function (string $uri): \Redis { $redis = new \Redis(); @@ -253,7 +253,7 @@ static function (string $uri): \Redis { } if (getenv('MYSQL_DSN')) { - yield 'MySQLMutex' => [static function ($timeout): Mutex { + yield 'MySQLMutex' => [static function ($timeout) { $pdo = new \PDO(getenv('MYSQL_DSN'), getenv('MYSQL_USER'), getenv('MYSQL_PASSWORD')); $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); @@ -262,7 +262,7 @@ static function (string $uri): \Redis { } if (getenv('PGSQL_DSN')) { - yield 'PostgreSQLMutex' => [static function (): Mutex { + yield 'PostgreSQLMutex' => [static function () { $pdo = new \PDO(getenv('PGSQL_DSN'), getenv('PGSQL_USER'), getenv('PGSQL_PASSWORD')); $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); diff --git a/tests/Mutex/MutexTest.php b/tests/Mutex/MutexTest.php index dc33917..d250eff 100644 --- a/tests/Mutex/MutexTest.php +++ b/tests/Mutex/MutexTest.php @@ -47,18 +47,18 @@ public static function setUpBeforeClass(): void */ public static function provideMutexFactoriesCases(): iterable { - yield 'NullMutex' => [static function (): Mutex { + yield 'NullMutex' => [static function () { return new NullMutex(); }]; - yield 'FlockMutex' => [static function (): Mutex { + yield 'FlockMutex' => [static function () { $file = fopen(vfsStream::url('test/lock'), 'w'); return new FlockMutex($file); }]; if (extension_loaded('pcntl')) { - yield 'flockWithTimoutPcntl' => [static function (): Mutex { + yield 'flockWithTimoutPcntl' => [static function () { $file = fopen(vfsStream::url('test/lock'), 'w'); $lock = Liberator::liberate(new FlockMutex($file, 3)); $lock->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_PCNTL, null, FlockMutex::class)(); // @phpstan-ignore property.notFound @@ -67,7 +67,7 @@ public static function provideMutexFactoriesCases(): iterable }]; } - yield 'flockWithTimoutLoop' => [static function (): Mutex { + yield 'flockWithTimoutLoop' => [static function () { $file = fopen(vfsStream::url('test/lock'), 'w'); $lock = Liberator::liberate(new FlockMutex($file, 3)); $lock->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_LOOP, null, FlockMutex::class)(); // @phpstan-ignore property.notFound @@ -76,12 +76,12 @@ public static function provideMutexFactoriesCases(): iterable }]; if (extension_loaded('sysvsem')) { - yield 'SemaphoreMutex' => [static function (): Mutex { + yield 'SemaphoreMutex' => [static function () { return new SemaphoreMutex(sem_get(ftok(__FILE__, 'a'))); }]; } - yield 'AbstractLockMutex' => [static function (): Mutex { + yield 'AbstractLockMutex' => [static function () { $lock = new class extends AbstractLockMutex { #[\Override] protected function lock(): void {} @@ -93,7 +93,7 @@ protected function unlock(): void {} return $lock; }]; - yield 'AbstractSpinlockMutex' => [static function (): Mutex { + yield 'AbstractSpinlockMutex' => [static function () { $lock = new class('test') extends AbstractSpinlockMutex { #[\Override] protected function acquire(string $key): bool @@ -112,7 +112,7 @@ protected function release(string $key): bool }]; if (getenv('MEMCACHE_HOST')) { - yield 'MemcachedMutex' => [static function (): Mutex { + yield 'MemcachedMutex' => [static function () { $memcached = new \Memcached(); $memcached->addServer(getenv('MEMCACHE_HOST'), 11211); @@ -123,7 +123,7 @@ protected function release(string $key): bool if (getenv('REDIS_URIS')) { $uris = explode(',', getenv('REDIS_URIS')); - yield 'DistributedMutex RedisMutex /w Predis' => [static function () use ($uris): Mutex { + yield 'DistributedMutex RedisMutex /w Predis' => [static function () use ($uris) { $clients = array_map( static fn ($uri) => new PredisClient($uri), $uris @@ -139,7 +139,7 @@ protected function release(string $key): bool if (class_exists(\Redis::class)) { yield 'DistributedMutex RedisMutex /w PHPRedis' => [ - static function () use ($uris): Mutex { + static function () use ($uris) { $clients = array_map( static function ($uri) { $redis = new \Redis(); @@ -171,7 +171,7 @@ static function ($uri) { } if (getenv('MYSQL_DSN')) { - yield 'MySQLMutex' => [static function (): Mutex { + yield 'MySQLMutex' => [static function () { $pdo = new \PDO(getenv('MYSQL_DSN'), getenv('MYSQL_USER'), getenv('MYSQL_PASSWORD')); $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); @@ -180,7 +180,7 @@ static function ($uri) { } if (getenv('PGSQL_DSN')) { - yield 'PostgreSQLMutex' => [static function (): Mutex { + yield 'PostgreSQLMutex' => [static function () { $pdo = new \PDO(getenv('PGSQL_DSN'), getenv('PGSQL_USER'), getenv('PGSQL_PASSWORD')); $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); @@ -200,7 +200,7 @@ static function ($uri) { public function testSynchronizedDelegates(\Closure $mutexFactory): void { $mutex = $mutexFactory(); - $result = $mutex->synchronized(static function (): string { + $result = $mutex->synchronized(static function () { return 'test'; }); self::assertSame('test', $result); diff --git a/tests/Mutex/PostgreSQLMutexTest.php b/tests/Mutex/PostgreSQLMutexTest.php index 2ebd43a..c2e0fc3 100644 --- a/tests/Mutex/PostgreSQLMutexTest.php +++ b/tests/Mutex/PostgreSQLMutexTest.php @@ -46,7 +46,7 @@ public function testAcquireLock(): void ->with(self::logicalAnd( new IsType(IsType::TYPE_ARRAY), self::countOf(2), - self::callback(function (...$arguments): bool { + self::callback(function (...$arguments) { if ($this->isPhpunit9x()) { // https://github.com/sebastianbergmann/phpunit/issues/5891 $arguments = $arguments[0]; } @@ -79,7 +79,7 @@ public function testReleaseLock(): void ->with(self::logicalAnd( new IsType(IsType::TYPE_ARRAY), self::countOf(2), - self::callback(function (...$arguments): bool { + self::callback(function (...$arguments) { if ($this->isPhpunit9x()) { // https://github.com/sebastianbergmann/phpunit/issues/5891 $arguments = $arguments[0]; } diff --git a/tests/Mutex/RedisMutexTest.php b/tests/Mutex/RedisMutexTest.php index 45a84d3..f2a6489 100644 --- a/tests/Mutex/RedisMutexTest.php +++ b/tests/Mutex/RedisMutexTest.php @@ -190,7 +190,7 @@ public function testAddFails(): void $this->closeMajorityConnections(); - $this->mutex->synchronized(static function (): void { + $this->mutex->synchronized(static function () { self::fail(); }); } @@ -202,7 +202,7 @@ public function testEvalScriptFails(): void { $this->expectException(LockReleaseException::class); - $this->mutex->synchronized(function (): void { + $this->mutex->synchronized(function () { $this->closeMajorityConnections(); }); } @@ -221,7 +221,7 @@ public function testSerializersAndCompressors(int $serializer, int $compressor): $connection->setOption(\Redis::OPT_COMPRESSION, $compressor); } - self::assertSame('test', $this->mutex->synchronized(static function (): string { + self::assertSame('test', $this->mutex->synchronized(static function () { return 'test'; })); } @@ -230,7 +230,7 @@ public function testResistantToPartialClusterFailuresForAcquiringLock(): void { $this->closeMinorityConnections(); - self::assertSame('test', $this->mutex->synchronized(static function (): string { + self::assertSame('test', $this->mutex->synchronized(static function () { return 'test'; })); } diff --git a/tests/Mutex/RedisMutexWithPredisTest.php b/tests/Mutex/RedisMutexWithPredisTest.php index 70a1b7f..eba0b3e 100644 --- a/tests/Mutex/RedisMutexWithPredisTest.php +++ b/tests/Mutex/RedisMutexWithPredisTest.php @@ -56,7 +56,7 @@ public function testAddFailsToSetKey(): void $this->expectException(LockAcquireException::class); - $this->mutex->synchronized(static function (): void { + $this->mutex->synchronized(static function () { self::fail(); }); } @@ -91,7 +91,7 @@ public function testWorksNormally(): void ->willReturn(true); $executed = false; - $this->mutex->synchronized(static function () use (&$executed): void { + $this->mutex->synchronized(static function () use (&$executed) { $executed = true; }); @@ -112,7 +112,7 @@ public function testAcquireExpireTimeoutLimit(): void ->with(self::anything(), 1, 'php-malkusch-lock:test', new IsType(IsType::TYPE_STRING)) ->willReturn(true); - $this->mutex->synchronized(static function (): void {}); + $this->mutex->synchronized(static function () {}); } /** @@ -133,7 +133,7 @@ public function testEvalScriptFails(): void $this->expectException(LockReleaseException::class); $executed = false; - $this->mutex->synchronized(static function () use (&$executed): void { + $this->mutex->synchronized(static function () use (&$executed) { $executed = true; }); diff --git a/tests/Util/DoubleCheckedLockingTest.php b/tests/Util/DoubleCheckedLockingTest.php index 2a848a9..491e9ff 100644 --- a/tests/Util/DoubleCheckedLockingTest.php +++ b/tests/Util/DoubleCheckedLockingTest.php @@ -28,11 +28,11 @@ public function testCheckFailsAcquiresNoLock(): void $this->mutex->expects(self::never()) ->method('synchronized'); - $checkedLocking = new DoubleCheckedLocking($this->mutex, static function (): bool { + $checkedLocking = new DoubleCheckedLocking($this->mutex, static function () { return false; }); - $result = $checkedLocking->then(static function (): void { + $result = $checkedLocking->then(static function () { self::fail(); }); @@ -54,7 +54,7 @@ public function testLockedCheckAndExecution(): void return $result; }); - $checkedLocking = new DoubleCheckedLocking($this->mutex, static function () use (&$lock, &$check): bool { + $checkedLocking = new DoubleCheckedLocking($this->mutex, static function () use (&$lock, &$check) { if ($check === 1) { self::assertSame(1, $lock); } @@ -84,12 +84,10 @@ public function testCodeNotExecuted(\Closure $check): void { $this->mutex->expects(self::any()) ->method('synchronized') - ->willReturnCallback(static function (\Closure $block) { - return $block(); - }); + ->willReturnCallback(static fn (\Closure $block) => $block()); $checkedLocking = new DoubleCheckedLocking($this->mutex, $check); - $result = $checkedLocking->then(static function (): void { + $result = $checkedLocking->then(static function () { self::fail(); }); @@ -101,12 +99,12 @@ public function testCodeNotExecuted(\Closure $check): void */ public static function provideCodeNotExecutedCases(): iterable { - yield 'failFirstCheck' => [static function (): bool { + yield 'failFirstCheck' => [static function () { return false; }]; $checkCounter = 0; - yield 'failSecondCheck' => [static function () use (&$checkCounter): bool { + yield 'failSecondCheck' => [static function () use (&$checkCounter) { return $checkCounter++ === 0; }]; } @@ -115,11 +113,9 @@ public function testCodeExecuted(): void { $this->mutex->expects(self::once()) ->method('synchronized') - ->willReturnCallback(static function (\Closure $block) { - return $block(); - }); + ->willReturnCallback(static fn (\Closure $block) => $block()); - $checkedLocking = new DoubleCheckedLocking($this->mutex, static function (): bool { + $checkedLocking = new DoubleCheckedLocking($this->mutex, static function () { return true; }); diff --git a/tests/Util/LoopTest.php b/tests/Util/LoopTest.php index 9fb020b..afeff11 100644 --- a/tests/Util/LoopTest.php +++ b/tests/Util/LoopTest.php @@ -47,7 +47,7 @@ public function testInvalidAcquireTimeout(float $acquireTimeout): void $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('The lock acquire timeout must be greater than or equal to 0.0 (' . LockUtil::getInstance()->formatTimeout($acquireTimeout) . ' was given)'); - $loop->execute(static function (): void { + $loop->execute(static function () { self::fail(); }, $acquireTimeout); } @@ -70,7 +70,7 @@ public static function provideInvalidAcquireTimeoutCases(): iterable public function testExecutionWithinAcquireTimeout(): void { $loop = new Loop(); - $loop->execute(static function () use ($loop): void { + $loop->execute(static function () use ($loop) { usleep(499 * 1000); $loop->end(); }, 0.5); @@ -82,7 +82,7 @@ public function testExecutionWithinAcquireTimeoutWithoutCallingEnd(): void $this->expectExceptionMessage('Lock acquire timeout of 0.5 seconds has been exceeded'); $loop = new Loop(); - $loop->execute(static function (): void { + $loop->execute(static function () { usleep(10 * 1000); }, 0.5); } @@ -94,7 +94,7 @@ public function testExecutionWithinAcquireTimeoutWithoutCallingEnd(): void public function testExceedAcquireTimeoutIsAcceptableIfEndWasCalled(): void { $loop = new Loop(); - $loop->execute(static function () use ($loop): void { + $loop->execute(static function () use ($loop) { usleep(501 * 1000); $loop->end(); }, 0.5); @@ -106,7 +106,7 @@ public function testExceedAcquireTimeoutWithoutCallingEnd(): void $this->expectExceptionMessage('Lock acquire timeout of 0.5 seconds has been exceeded'); $loop = new Loop(); - $loop->execute(static function (): void { + $loop->execute(static function () { usleep(501 * 1000); }, 0.5); } @@ -136,7 +136,7 @@ public function testEndCodeExecutedTwice(): void { $i = 0; $loop = new Loop(); - $loop->execute(static function () use ($loop, &$i): void { + $loop->execute(static function () use ($loop, &$i) { ++$i; if ($i > 1) { $loop->end(); From 0ff60e020c53167df68c56b7957e06eb199a7345 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 22:23:20 +0100 Subject: [PATCH 19/22] make README more readable --- README.md | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 0d813ec..6f73b48 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ **[Requirements](#requirements)** | **[Installation](#installation)** | **[Usage](#usage)** | +**[Implementations](#implementations)** | **[Authors](#authors)** | **[License](#license)** @@ -51,7 +52,7 @@ This library uses the namespace `Malkusch\Lock`. The [`Malkusch\Lock\Mutex\Mutex`][5] interface provides the base API for this library. -#### Mutex::synchronized() +### Mutex::synchronized() [`Malkusch\Lock\Mutex\Mutex::synchronized()`][6] executes code exclusively. This method guarantees that the code is only executed by one process at once. Other @@ -77,7 +78,7 @@ $newBalance = $mutex->synchronized(static function () use ($bankAccount, $amount }); ``` -#### Mutex::check() +### Mutex::check() [`Malkusch\Lock\Mutex\Mutex::check()`][7] sets a callable, which will be executed when [`Malkusch\Lock\Util\DoubleCheckedLocking::then()`][8] is called, @@ -122,7 +123,7 @@ if (!$newBalance) { } ``` -#### Extracting code result after lock release exception +### Extracting code result after lock release exception Mutex implementations based on [`Malkush\Lock\Mutex\AbstractLockMutex`][10] will throw [`Malkusch\Lock\Exception\LockReleaseException`][11] in case of lock release @@ -155,7 +156,7 @@ try { } ``` -### Implementations +## Implementations You can choose from one of the provided [`Malkusch\Lock\Mutex\Mutex`](#mutex) interface implementations or create/extend your own implementation. @@ -168,7 +169,7 @@ implementations or create/extend your own implementation. - [`PostgreSQLMutex`](#postgresqlmutex) - [`DistributedMutex`](#distributedmutex) -#### FlockMutex +### FlockMutex The **FlockMutex** is a lock implementation based on [`flock()`](https://php.net/manual/en/function.flock.php). @@ -181,7 +182,7 @@ $mutex = new FlockMutex(fopen(__FILE__, 'r')); Timeouts are supported as an optional second argument. This uses the `ext-pcntl` extension if possible or busy waiting if not. -#### MemcachedMutex +### MemcachedMutex The **MemcachedMutex** is a spinlock implementation which uses the [`Memcached` extension](https://php.net/manual/en/book.memcached.php). @@ -194,7 +195,7 @@ $memcached->addServer('localhost', 11211); $mutex = new MemcachedMutex('balance', $memcached); ``` -#### RedisMutex +### RedisMutex The **RedisMutex** is a lock implementation which supports the [`phpredis` extension](https://github.com/phpredis/phpredis) @@ -214,7 +215,7 @@ $redis->connect('localhost'); $mutex = new RedisMutex($redis, 'balance'); ``` -#### SemaphoreMutex +### SemaphoreMutex The **SemaphoreMutex** is a lock implementation based on [Semaphore](https://php.net/manual/en/ref.sem.php). @@ -225,7 +226,7 @@ $semaphore = sem_get(ftok(__FILE__, 'a')); $mutex = new SemaphoreMutex($semaphore); ``` -#### MySQLMutex +### MySQLMutex The **MySQLMutex** uses MySQL's [`GET_LOCK`](https://dev.mysql.com/doc/refman/9.0/en/locking-functions.html#function_get-lock) @@ -248,7 +249,7 @@ $pdo = new \PDO('mysql:host=localhost;dbname=test', 'username'); $mutex = new MySQLMutex($pdo, 'balance', 15); ``` -#### PostgreSQLMutex +### PostgreSQLMutex The **PostgreSQLMutex** uses PostgreSQL's [advisory locking](https://www.postgresql.org/docs/9.4/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS) @@ -265,7 +266,7 @@ $pdo = new \PDO('pgsql:host=localhost;dbname=test', 'username'); $mutex = new PostgreSQLMutex($pdo, 'balance'); ``` -#### DistributedMutex +### DistributedMutex The **DistributedMutex** is the distributed lock implementation of [RedLock](https://redis.io/topics/distlock#the-redlock-algorithm) which supports From 60e6f2740c4bac4dd54ff28ce11e1f93b9faf81d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 22:57:27 +0100 Subject: [PATCH 20/22] move redis limit test --- tests/Mutex/RedisMutexTest.php | 34 ++++++++++++++++++++++++ tests/Mutex/RedisMutexWithPredisTest.php | 21 ++------------- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/tests/Mutex/RedisMutexTest.php b/tests/Mutex/RedisMutexTest.php index f2a6489..13ffb04 100644 --- a/tests/Mutex/RedisMutexTest.php +++ b/tests/Mutex/RedisMutexTest.php @@ -11,7 +11,9 @@ use Malkusch\Lock\Mutex\RedisMutex; use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\Attributes\RequiresPhpExtension; +use PHPUnit\Framework\Constraint\IsType; use PHPUnit\Framework\TestCase; +use Predis\ClientInterface as PredisClientInterface; if (\PHP_MAJOR_VERSION >= 8) { trait RedisCompatibilityTrait @@ -57,6 +59,19 @@ public function set($key, $value, $options = null) } } +interface PredisClientInterfaceWithSetAndEvalMethods extends PredisClientInterface +{ + /** + * @return mixed + */ + public function eval(); + + /** + * @return mixed + */ + public function set(); +} + /** * These tests require the environment variable: * @@ -207,6 +222,25 @@ public function testEvalScriptFails(): void }); } + public function testAcquireExpireTimeoutLimit(): void + { + $client = $this->createMock(PredisClientInterfaceWithSetAndEvalMethods::class); + + $this->mutex = new RedisMutex($client, 'test'); + + $client->expects(self::once()) + ->method('set') + ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 31_557_600_000_000, 'NX') + ->willReturnSelf(); + + $client->expects(self::once()) + ->method('eval') + ->with(self::anything(), 1, 'php-malkusch-lock:test', new IsType(IsType::TYPE_STRING)) + ->willReturn(true); + + $this->mutex->synchronized(static function () {}); + } + /** * @param \Redis::SERIALIZER_* $serializer * @param \Redis::COMPRESSION_* $compressor diff --git a/tests/Mutex/RedisMutexWithPredisTest.php b/tests/Mutex/RedisMutexWithPredisTest.php index eba0b3e..7b008c4 100644 --- a/tests/Mutex/RedisMutexWithPredisTest.php +++ b/tests/Mutex/RedisMutexWithPredisTest.php @@ -13,7 +13,7 @@ use Predis\ClientInterface as PredisClientInterface; use Predis\PredisException; -interface PredisClientInterfaceWithSetAndEvalMethods extends PredisClientInterface +interface PredisClientInterfaceWithSetAndEvalMethods2 extends PredisClientInterface { /** * @return mixed @@ -39,7 +39,7 @@ protected function setUp(): void { parent::setUp(); - $this->client = $this->createMock(PredisClientInterfaceWithSetAndEvalMethods::class); + $this->client = $this->createMock(PredisClientInterfaceWithSetAndEvalMethods2::class); $this->mutex = new RedisMutex($this->client, 'test', 2.5, 3.5); } @@ -98,23 +98,6 @@ public function testWorksNormally(): void self::assertTrue($executed); } - public function testAcquireExpireTimeoutLimit(): void - { - $this->mutex = new RedisMutex($this->client, 'test'); - - $this->client->expects(self::once()) - ->method('set') - ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 31_557_600_000_000, 'NX') - ->willReturnSelf(); - - $this->client->expects(self::once()) - ->method('eval') - ->with(self::anything(), 1, 'php-malkusch-lock:test', new IsType(IsType::TYPE_STRING)) - ->willReturn(true); - - $this->mutex->synchronized(static function () {}); - } - /** * Tests evalScript() fails. */ From 4b946fe92610b85b01fe45c71651bf27602ee294 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Mon, 9 Dec 2024 23:28:57 +0100 Subject: [PATCH 21/22] test exceptions better --- tests/Mutex/AbstractLockMutexTest.php | 9 +++------ tests/Mutex/AbstractSpinlockMutexTest.php | 11 ++++------- .../AbstractSpinlockWithTokenMutexTest.php | 1 - tests/Mutex/DistributedMutexTest.php | 17 ++++++----------- tests/Mutex/FlockMutexTest.php | 9 ++++----- tests/Mutex/MemcachedMutexTest.php | 6 ++---- tests/Mutex/MutexTest.php | 4 ++-- tests/Mutex/RedisMutexTest.php | 6 ++---- tests/Mutex/RedisMutexWithPredisTest.php | 19 +------------------ 9 files changed, 24 insertions(+), 58 deletions(-) diff --git a/tests/Mutex/AbstractLockMutexTest.php b/tests/Mutex/AbstractLockMutexTest.php index 731c51f..50fcb5f 100644 --- a/tests/Mutex/AbstractLockMutexTest.php +++ b/tests/Mutex/AbstractLockMutexTest.php @@ -30,12 +30,11 @@ protected function setUp(): void */ public function testLockFails(): void { - $this->expectException(LockAcquireException::class); - $this->mutex->expects(self::once()) ->method('lock') ->willThrowException(new LockAcquireException()); + $this->expectException(LockAcquireException::class); $this->mutex->synchronized(static function () { self::fail(); }); @@ -71,12 +70,11 @@ public function testUnlockAfterException(): void */ public function testUnlockFailsAfterCode(): void { - $this->expectException(LockReleaseException::class); - $this->mutex->expects(self::once()) ->method('unlock') ->willThrowException(new LockReleaseException()); + $this->expectException(LockReleaseException::class); $this->mutex->synchronized(static function () {}); } @@ -85,12 +83,11 @@ public function testUnlockFailsAfterCode(): void */ public function testUnlockFailsAfterException(): void { - $this->expectException(LockReleaseException::class); - $this->mutex->expects(self::once()) ->method('unlock') ->willThrowException(new LockReleaseException()); + $this->expectException(LockReleaseException::class); $this->mutex->synchronized(static function () { throw new \DomainException(); }); diff --git a/tests/Mutex/AbstractSpinlockMutexTest.php b/tests/Mutex/AbstractSpinlockMutexTest.php index 0e04e62..9faa11e 100644 --- a/tests/Mutex/AbstractSpinlockMutexTest.php +++ b/tests/Mutex/AbstractSpinlockMutexTest.php @@ -53,13 +53,12 @@ private function createSpinlockMutexMock(float $acquireTimeout = 3): AbstractSpi */ public function testFailAcquireLock(): void { - $this->expectException(LockAcquireException::class); - $mutex = $this->createSpinlockMutexMock(); $mutex->expects(self::any()) ->method('acquire') ->willThrowException(new LockAcquireException()); + $this->expectException(LockAcquireException::class); $mutex->synchronized(static function () { self::fail(); }); @@ -70,14 +69,13 @@ public function testFailAcquireLock(): void */ public function testAcquireTimeouts(): void { - $this->expectException(LockAcquireTimeoutException::class); - $this->expectExceptionMessage('Lock acquire timeout of 3.0 seconds has been exceeded'); - $mutex = $this->createSpinlockMutexMock(); $mutex->expects(self::atLeastOnce()) ->method('acquire') ->willReturn(false); + $this->expectException(LockAcquireTimeoutException::class); + $this->expectExceptionMessage('Lock acquire timeout of 3.0 seconds has been exceeded'); $mutex->synchronized(static function () { self::fail(); }); @@ -106,8 +104,6 @@ public function testExecuteBarelySucceeds(): void */ public function testFailReleasingLock(): void { - $this->expectException(LockReleaseException::class); - $mutex = $this->createSpinlockMutexMock(); $mutex->expects(self::any()) ->method('acquire') @@ -116,6 +112,7 @@ public function testFailReleasingLock(): void ->method('release') ->willReturn(false); + $this->expectException(LockReleaseException::class); $mutex->synchronized(static function () {}); } } diff --git a/tests/Mutex/AbstractSpinlockWithTokenMutexTest.php b/tests/Mutex/AbstractSpinlockWithTokenMutexTest.php index 38810c5..34cb69a 100644 --- a/tests/Mutex/AbstractSpinlockWithTokenMutexTest.php +++ b/tests/Mutex/AbstractSpinlockWithTokenMutexTest.php @@ -78,7 +78,6 @@ public function testExecuteTooLong(): void $this->expectException(ExecutionOutsideLockException::class); $this->expectExceptionMessageMatches('~^The code executed for 0\.2\d+ seconds\. But the expire timeout is 0\.2 seconds. The last 0\.0\d+ seconds were executed outside of the lock\.$~'); - $mutex->synchronized(static function () { usleep(201 * 1000); }); diff --git a/tests/Mutex/DistributedMutexTest.php b/tests/Mutex/DistributedMutexTest.php index 3374136..6381c21 100644 --- a/tests/Mutex/DistributedMutexTest.php +++ b/tests/Mutex/DistributedMutexTest.php @@ -87,9 +87,6 @@ function (int $i) { #[DataProvider('provideMinorityCases')] public function testTooFewServerToAcquire(int $count, int $available): void { - $this->expectException(LockAcquireException::class); - $this->expectExceptionCode(MutexException::CODE_REDLOCK_NOT_ENOUGH_SERVERS); - $mutex = $this->createDistributedMutexMock($count); $i = 0; @@ -103,6 +100,8 @@ public function testTooFewServerToAcquire(int $count, int $available): void throw new LockAcquireException(); }); + $this->expectException(LockAcquireException::class); + $this->expectExceptionCode(MutexException::CODE_REDLOCK_NOT_ENOUGH_SERVERS); $mutex->synchronized(static function () { self::fail(); }); @@ -149,9 +148,6 @@ public function testFaultTolerance(int $count, int $available): void #[DataProvider('provideMinorityCases')] public function testAcquireTooFewKeys(int $count, int $available): void { - $this->expectException(LockAcquireTimeoutException::class); - $this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded'); - $mutex = $this->createDistributedMutexMock($count); $i = 0; @@ -161,6 +157,8 @@ public function testAcquireTooFewKeys(int $count, int $available): void return ++$i <= $available; }); + $this->expectException(LockAcquireTimeoutException::class); + $this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded'); $mutex->synchronized(static function () { self::fail(); }); @@ -178,9 +176,6 @@ public function testAcquireTooFewKeys(int $count, int $available): void #[DataProvider('provideAcquireTimeoutsCases')] public function testAcquireTimeouts(int $count, float $timeout, float $delay): void { - $this->expectException(LockAcquireTimeoutException::class); - $this->expectExceptionMessage('Lock acquire timeout of ' . LockUtil::getInstance()->formatTimeout($timeout) . ' seconds has been exceeded'); - $mutex = $this->createDistributedMutexMock($count, $timeout, $timeout); $mutex->expects(self::exactly($count)) ->method('releaseMutex') @@ -194,6 +189,8 @@ public function testAcquireTimeouts(int $count, float $timeout, float $delay): v return true; }); + $this->expectException(LockAcquireTimeoutException::class); + $this->expectExceptionMessage('Lock acquire timeout of ' . LockUtil::getInstance()->formatTimeout($timeout) . ' seconds has been exceeded'); $mutex->synchronized(static function () { self::fail(); }); @@ -262,7 +259,6 @@ public function testTooFewServersToRelease(int $count, int $available): void }); $this->expectException(LockReleaseException::class); - $mutex->synchronized(static function () {}); } @@ -290,7 +286,6 @@ public function testReleaseTooFewKeys(int $count, int $available): void }); $this->expectException(LockReleaseException::class); - $mutex->synchronized(static function () {}); } diff --git a/tests/Mutex/FlockMutexTest.php b/tests/Mutex/FlockMutexTest.php index 06758d4..cd147d1 100644 --- a/tests/Mutex/FlockMutexTest.php +++ b/tests/Mutex/FlockMutexTest.php @@ -64,14 +64,13 @@ public function testCodeExecutedOutsideLockIsNotThrown(string $strategy): void #[DataProvider('provideTimeoutableStrategiesCases')] public function testAcquireTimeoutOccurs(string $strategy): void { - $this->expectException(LockAcquireTimeoutException::class); - $this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded'); - $anotherResource = fopen($this->file, 'r'); flock($anotherResource, \LOCK_EX); $this->mutex->strategy = $strategy; // @phpstan-ignore property.private + $this->expectException(LockAcquireTimeoutException::class); + $this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded'); try { $this->mutex->synchronized(static function () { self::fail(); @@ -99,14 +98,14 @@ public static function provideTimeoutableStrategiesCases(): iterable #[RequiresPhpExtension('pcntl')] public function testNoTimeoutWaitsForever(): void { - $this->expectException(DeadlineException::class); - $anotherResource = fopen($this->file, 'r'); flock($anotherResource, \LOCK_EX); $this->mutex->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_BLOCK, null, FlockMutex::class)(); // @phpstan-ignore property.private $timebox = new PcntlTimeout(1); + + $this->expectException(DeadlineException::class); $timebox->timeBoxed(function () { $this->mutex->synchronized(static function () { self::fail(); diff --git a/tests/Mutex/MemcachedMutexTest.php b/tests/Mutex/MemcachedMutexTest.php index 3cc94ff..e836ed2 100644 --- a/tests/Mutex/MemcachedMutexTest.php +++ b/tests/Mutex/MemcachedMutexTest.php @@ -36,13 +36,12 @@ protected function setUp(): void public function testAcquireFail(): void { - $this->expectException(LockAcquireTimeoutException::class); - $this->memcached->expects(self::atLeastOnce()) ->method('add') ->with('php-malkusch-lock:test', true, 3) ->willReturn(false); + $this->expectException(LockAcquireTimeoutException::class); $this->mutex->synchronized(static function () { self::fail(); }); @@ -50,8 +49,6 @@ public function testAcquireFail(): void public function testReleaseFail(): void { - $this->expectException(LockReleaseException::class); - $this->memcached->expects(self::once()) ->method('add') ->with('php-malkusch-lock:test', true, 3) @@ -62,6 +59,7 @@ public function testReleaseFail(): void ->with('php-malkusch-lock:test') ->willReturn(false); + $this->expectException(LockReleaseException::class); $this->mutex->synchronized(static function () {}); } diff --git a/tests/Mutex/MutexTest.php b/tests/Mutex/MutexTest.php index d250eff..84880d2 100644 --- a/tests/Mutex/MutexTest.php +++ b/tests/Mutex/MutexTest.php @@ -235,9 +235,9 @@ public function testRelease(\Closure $mutexFactory): void #[DataProvider('provideMutexFactoriesCases')] public function testSynchronizedPassesExceptionThrough(\Closure $mutexFactory): void { - $this->expectException(\DomainException::class); - $mutex = $mutexFactory(); + + $this->expectException(\DomainException::class); $mutex->synchronized(static function () { throw new \DomainException(); }); diff --git a/tests/Mutex/RedisMutexTest.php b/tests/Mutex/RedisMutexTest.php index 13ffb04..a49136b 100644 --- a/tests/Mutex/RedisMutexTest.php +++ b/tests/Mutex/RedisMutexTest.php @@ -200,11 +200,10 @@ private function closeMinorityConnections(): void public function testAddFails(): void { - $this->expectException(LockAcquireException::class); - $this->expectExceptionCode(MutexException::CODE_REDLOCK_NOT_ENOUGH_SERVERS); - $this->closeMajorityConnections(); + $this->expectException(LockAcquireException::class); + $this->expectExceptionCode(MutexException::CODE_REDLOCK_NOT_ENOUGH_SERVERS); $this->mutex->synchronized(static function () { self::fail(); }); @@ -216,7 +215,6 @@ public function testAddFails(): void public function testEvalScriptFails(): void { $this->expectException(LockReleaseException::class); - $this->mutex->synchronized(function () { $this->closeMajorityConnections(); }); diff --git a/tests/Mutex/RedisMutexWithPredisTest.php b/tests/Mutex/RedisMutexWithPredisTest.php index 7b008c4..875cff3 100644 --- a/tests/Mutex/RedisMutexWithPredisTest.php +++ b/tests/Mutex/RedisMutexWithPredisTest.php @@ -44,9 +44,6 @@ protected function setUp(): void $this->mutex = new RedisMutex($this->client, 'test', 2.5, 3.5); } - /** - * Tests add() fails. - */ public function testAddFailsToSetKey(): void { $this->client->expects(self::atLeastOnce()) @@ -55,15 +52,11 @@ public function testAddFailsToSetKey(): void ->willReturn(null); $this->expectException(LockAcquireException::class); - $this->mutex->synchronized(static function () { self::fail(); }); } - /** - * Tests add() errors. - */ public function testAddErrors(): void { $this->client->expects(self::atLeastOnce()) @@ -72,7 +65,6 @@ public function testAddErrors(): void ->willThrowException($this->createMock(PredisException::class)); $this->expectException(LockAcquireException::class); - $this->mutex->synchronized(static function () { self::fail(); }); @@ -98,9 +90,6 @@ public function testWorksNormally(): void self::assertTrue($executed); } - /** - * Tests evalScript() fails. - */ public function testEvalScriptFails(): void { $this->client->expects(self::atLeastOnce()) @@ -114,12 +103,6 @@ public function testEvalScriptFails(): void ->willThrowException($this->createMock(PredisException::class)); $this->expectException(LockReleaseException::class); - - $executed = false; - $this->mutex->synchronized(static function () use (&$executed) { - $executed = true; - }); - - self::assertTrue($executed); + $this->mutex->synchronized(static function () {}); } } From f584009152d59e839ddc00881628f88446727267 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Vo=C5=99=C3=AD=C5=A1ek?= Date: Tue, 10 Dec 2024 00:02:39 +0100 Subject: [PATCH 22/22] add logger tests --- src/Mutex/DistributedMutex.php | 5 +- tests/Mutex/DistributedMutexTest.php | 48 ++++++++++ tests/Mutex/RedisMutexWithPredisTest.php | 108 ----------------------- 3 files changed, 49 insertions(+), 112 deletions(-) delete mode 100644 tests/Mutex/RedisMutexWithPredisTest.php diff --git a/src/Mutex/DistributedMutex.php b/src/Mutex/DistributedMutex.php index c9b2ab1..4da4b08 100644 --- a/src/Mutex/DistributedMutex.php +++ b/src/Mutex/DistributedMutex.php @@ -27,9 +27,6 @@ class DistributedMutex extends AbstractSpinlockWithTokenMutex implements LoggerA private ?array $lockedMutexIndexes = null; /** - * The Redis instance needs to be connected. I.e. Redis::connect() was - * called already. - * * @param array $mutexes * @param float $acquireTimeout In seconds * @param float $expireTimeout In seconds @@ -91,7 +88,7 @@ protected function acquireWithToken(string $key, float $expireTimeout) assert($exception !== null); // The last exception for some context. throw new LockAcquireException( - 'It\'s not possible to acquire a lock because at least half of the Redis server are not available', + 'It is not possible to acquire a lock because at least half of the servers are not available', LockAcquireException::CODE_REDLOCK_NOT_ENOUGH_SERVERS, $exception ); diff --git a/tests/Mutex/DistributedMutexTest.php b/tests/Mutex/DistributedMutexTest.php index 6381c21..97f83a3 100644 --- a/tests/Mutex/DistributedMutexTest.php +++ b/tests/Mutex/DistributedMutexTest.php @@ -8,6 +8,7 @@ use Malkusch\Lock\Exception\LockAcquireTimeoutException; use Malkusch\Lock\Exception\LockReleaseException; use Malkusch\Lock\Exception\MutexException; +use Malkusch\Lock\Mutex\AbstractSpinlockMutex; use Malkusch\Lock\Mutex\AbstractSpinlockWithTokenMutex; use Malkusch\Lock\Mutex\DistributedMutex; use Malkusch\Lock\Util\LockUtil; @@ -17,6 +18,8 @@ use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use Predis\PredisException; +use Psr\Log\LoggerInterface; class DistributedMutexTest extends TestCase { @@ -319,4 +322,49 @@ public static function provideMajorityCases(): iterable yield [3, 3]; yield [4, 3]; } + + public function testAcquireMutexLogger(): void + { + $mutex = $this->createDistributedMutexMock(3); + $logger = $this->createMock(LoggerInterface::class); + $mutex->setLogger($logger); + + $mutex->expects(self::exactly(3)) + ->method('acquireMutex') + ->with(self::isInstanceOf(AbstractSpinlockMutex::class), 'php-malkusch-lock:', 1.0, \INF) + ->willThrowException($this->createMock(/* PredisException::class */ LockAcquireException::class)); + + $logger->expects(self::exactly(3)) + ->method('warning') + ->with('Could not set {key} = {token} at server #{index}', self::anything()); + + $this->expectException(LockAcquireException::class); + $this->expectExceptionMessage('It is not possible to acquire a lock because at least half of the servers are not available'); + $mutex->synchronized(static function () { + self::fail(); + }); + } + + public function testReleaseMutexLogger(): void + { + $mutex = $this->createDistributedMutexMock(3); + $logger = $this->createMock(LoggerInterface::class); + $mutex->setLogger($logger); + + $mutex->expects(self::exactly(3)) + ->method('acquireMutex') + ->willReturn(true); + + $mutex->expects(self::exactly(3)) + ->method('releaseMutex') + ->with(self::isInstanceOf(AbstractSpinlockMutex::class), 'php-malkusch-lock:', \INF) + ->willThrowException($this->createMock(/* PredisException::class */ LockReleaseException::class)); + + $logger->expects(self::exactly(3)) + ->method('warning') + ->with('Could not unset {key} = {token} at server #{index}', self::anything()); + + $this->expectException(LockReleaseException::class); + $mutex->synchronized(static function () {}); + } } diff --git a/tests/Mutex/RedisMutexWithPredisTest.php b/tests/Mutex/RedisMutexWithPredisTest.php deleted file mode 100644 index 875cff3..0000000 --- a/tests/Mutex/RedisMutexWithPredisTest.php +++ /dev/null @@ -1,108 +0,0 @@ -client = $this->createMock(PredisClientInterfaceWithSetAndEvalMethods2::class); - - $this->mutex = new RedisMutex($this->client, 'test', 2.5, 3.5); - } - - public function testAddFailsToSetKey(): void - { - $this->client->expects(self::atLeastOnce()) - ->method('set') - ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 3501, 'NX') - ->willReturn(null); - - $this->expectException(LockAcquireException::class); - $this->mutex->synchronized(static function () { - self::fail(); - }); - } - - public function testAddErrors(): void - { - $this->client->expects(self::atLeastOnce()) - ->method('set') - ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 3501, 'NX') - ->willThrowException($this->createMock(PredisException::class)); - - $this->expectException(LockAcquireException::class); - $this->mutex->synchronized(static function () { - self::fail(); - }); - } - - public function testWorksNormally(): void - { - $this->client->expects(self::atLeastOnce()) - ->method('set') - ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 3501, 'NX') - ->willReturnSelf(); - - $this->client->expects(self::once()) - ->method('eval') - ->with(self::anything(), 1, 'php-malkusch-lock:test', new IsType(IsType::TYPE_STRING)) - ->willReturn(true); - - $executed = false; - $this->mutex->synchronized(static function () use (&$executed) { - $executed = true; - }); - - self::assertTrue($executed); - } - - public function testEvalScriptFails(): void - { - $this->client->expects(self::atLeastOnce()) - ->method('set') - ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 3501, 'NX') - ->willReturnSelf(); - - $this->client->expects(self::once()) - ->method('eval') - ->with(self::anything(), 1, 'php-malkusch-lock:test', new IsType(IsType::TYPE_STRING)) - ->willThrowException($this->createMock(PredisException::class)); - - $this->expectException(LockReleaseException::class); - $this->mutex->synchronized(static function () {}); - } -}