diff --git a/README.md b/README.md index 5611e34..6f73b48 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ **[Requirements](#requirements)** | **[Installation](#installation)** | **[Usage](#usage)** | +**[Implementations](#implementations)** | **[Authors](#authors)** | **[License](#license)** @@ -51,7 +52,7 @@ This library uses the namespace `Malkusch\Lock`. The [`Malkusch\Lock\Mutex\Mutex`][5] interface provides the base API for this library. -#### Mutex::synchronized() +### Mutex::synchronized() [`Malkusch\Lock\Mutex\Mutex::synchronized()`][6] executes code exclusively. This method guarantees that the code is only executed by one process at once. Other @@ -65,10 +66,7 @@ return value `false` or `null` should be seen as a failed action. Example: ```php -$newBalance = $mutex->synchronized(function () use ( - $bankAccount, - $amount -): int { +$newBalance = $mutex->synchronized(static function () use ($bankAccount, $amount) { $balance = $bankAccount->getBalance(); $balance -= $amount; if ($balance < 0) { @@ -80,7 +78,7 @@ $newBalance = $mutex->synchronized(function () use ( }); ``` -#### Mutex::check() +### Mutex::check() [`Malkusch\Lock\Mutex\Mutex::check()`][7] sets a callable, which will be executed when [`Malkusch\Lock\Util\DoubleCheckedLocking::then()`][8] is called, @@ -108,9 +106,9 @@ this return value will not be checked by the library. Example: ```php -$newBalance = $mutex->check(function () use ($bankAccount, $amount): bool { +$newBalance = $mutex->check(static function () use ($bankAccount, $amount): bool { return $bankAccount->getBalance() >= $amount; -})->then(function () use ($bankAccount, $amount): int { +})->then(static function () use ($bankAccount, $amount) { $balance = $bankAccount->getBalance(); $balance -= $amount; $bankAccount->setBalance($balance); @@ -118,14 +116,14 @@ $newBalance = $mutex->check(function () use ($bankAccount, $amount): bool { return $balance; }); -if ($newBalance === false) { +if (!$newBalance) { if ($balance < 0) { throw new \DomainException('You have no credit'); } } ``` -#### Extracting code result after lock release exception +### Extracting code result after lock release exception Mutex implementations based on [`Malkush\Lock\Mutex\AbstractLockMutex`][10] will throw [`Malkusch\Lock\Exception\LockReleaseException`][11] in case of lock release @@ -136,8 +134,8 @@ In order to read the code result (or an exception thrown there), Example: ```php try { - // or $mutex->check(...) - $result = $mutex->synchronized(function () { + // OR $mutex->check(...) + $result = $mutex->synchronized(static function () { if (someCondition()) { throw new \DomainException(); } @@ -149,7 +147,7 @@ try { $codeException = $unlockException->getCodeException(); // do something with the code exception } else { - $code_result = $unlockException->getCodeResult(); + $codeResult = $unlockException->getCodeResult(); // do something with the code result } @@ -158,7 +156,7 @@ try { } ``` -### Implementations +## Implementations You can choose from one of the provided [`Malkusch\Lock\Mutex\Mutex`](#mutex) interface implementations or create/extend your own implementation. @@ -168,9 +166,10 @@ implementations or create/extend your own implementation. - [`RedisMutex`](#redismutex) - [`SemaphoreMutex`](#semaphoremutex) - [`MySQLMutex`](#mysqlmutex) -- [`PostgreSQLMutex`](#PostgreSQLMutex) +- [`PostgreSQLMutex`](#postgresqlmutex) +- [`DistributedMutex`](#distributedmutex) -#### FlockMutex +### FlockMutex The **FlockMutex** is a lock implementation based on [`flock()`](https://php.net/manual/en/function.flock.php). @@ -178,20 +177,12 @@ The **FlockMutex** is a lock implementation based on Example: ```php $mutex = new FlockMutex(fopen(__FILE__, 'r')); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); ``` Timeouts are supported as an optional second argument. This uses the `ext-pcntl` extension if possible or busy waiting if not. -#### MemcachedMutex +### MemcachedMutex The **MemcachedMutex** is a spinlock implementation which uses the [`Memcached` extension](https://php.net/manual/en/book.memcached.php). @@ -202,22 +193,13 @@ $memcached = new \Memcached(); $memcached->addServer('localhost', 11211); $mutex = new MemcachedMutex('balance', $memcached); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); ``` -#### RedisMutex +### RedisMutex -The **RedisMutex** is the distributed lock implementation of -[RedLock](https://redis.io/topics/distlock#the-redlock-algorithm) which supports the +The **RedisMutex** is a lock implementation which supports the [`phpredis` extension](https://github.com/phpredis/phpredis) -or [`Predis` API](https://github.com/nrk/predis). +or [`Predis` API](https://github.com/nrk/predis) clients. Both Redis and Valkey servers are supported. @@ -230,18 +212,10 @@ $redis = new \Redis(); $redis->connect('localhost'); // OR $redis = new \Predis\Client('redis://localhost'); -$mutex = new RedisMutex([$redis], 'balance'); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); +$mutex = new RedisMutex($redis, 'balance'); ``` -#### SemaphoreMutex +### SemaphoreMutex The **SemaphoreMutex** is a lock implementation based on [Semaphore](https://php.net/manual/en/ref.sem.php). @@ -250,17 +224,9 @@ Example: ```php $semaphore = sem_get(ftok(__FILE__, 'a')); $mutex = new SemaphoreMutex($semaphore); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); ``` -#### MySQLMutex +### MySQLMutex The **MySQLMutex** uses MySQL's [`GET_LOCK`](https://dev.mysql.com/doc/refman/9.0/en/locking-functions.html#function_get-lock) @@ -280,19 +246,10 @@ you to namespace your locks like `dbname.lockname`. ```php $pdo = new \PDO('mysql:host=localhost;dbname=test', 'username'); - $mutex = new MySQLMutex($pdo, 'balance', 15); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); ``` -#### PostgreSQLMutex +### PostgreSQLMutex The **PostgreSQLMutex** uses PostgreSQL's [advisory locking](https://www.postgresql.org/docs/9.4/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS) @@ -306,16 +263,21 @@ interrupted, the lock is automatically released. ```php $pdo = new \PDO('pgsql:host=localhost;dbname=test', 'username'); - $mutex = new PostgreSQLMutex($pdo, 'balance'); -$mutex->synchronized(function () use ($bankAccount, $amount) { - $balance = $bankAccount->getBalance(); - $balance -= $amount; - if ($balance < 0) { - throw new \DomainException('You have no credit'); - } - $bankAccount->setBalance($balance); -}); +``` + +### DistributedMutex + +The **DistributedMutex** is the distributed lock implementation of +[RedLock](https://redis.io/topics/distlock#the-redlock-algorithm) which supports +one or more [`Malkush\Lock\Mutex\AbstractSpinlockMutex`][10] instances. + +Example: +```php +$mutex = new DistributedMutex([ + new \Predis\Client('redis://10.0.0.1'), + new \Predis\Client('redis://10.0.0.2'), +], 'balance'); ``` ## Authors @@ -341,3 +303,4 @@ This project is free and is licensed under the MIT. [9]: https://en.wikipedia.org/wiki/Double-checked_locking [10]: https://github.com/php-lock/lock/blob/3ca295ccda/src/Mutex/AbstractLockMutex.php [11]: https://github.com/php-lock/lock/blob/3ca295ccda/src/Exception/LockReleaseException.php +[12]: https://github.com/php-lock/lock/blob/41509dda0a/src/Mutex/AbstractSpinlockMutex.php#L15 diff --git a/phpstan.neon.dist b/phpstan.neon.dist index 872f626..756c97d 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -11,11 +11,6 @@ parameters: ignoreErrors: # TODO - - - path: 'src/Mutex/AbstractRedlockMutex.php' - identifier: if.condNotBoolean - message: '~^Only booleans are allowed in an if condition, mixed given\.$~' - count: 1 - path: 'tests/Mutex/*Test.php' identifier: empty.notAllowed diff --git a/src/Mutex/AbstractRedlockMutex.php b/src/Mutex/AbstractRedlockMutex.php deleted file mode 100644 index 5929e00..0000000 --- a/src/Mutex/AbstractRedlockMutex.php +++ /dev/null @@ -1,169 +0,0 @@ - */ - private array $clients; - - /** - * The Redis instance needs to be connected. I.e. Redis::connect() was - * called already. - * - * @param array $clients - * @param float $acquireTimeout In seconds - * @param float $expireTimeout In seconds - */ - public function __construct(array $clients, string $name, float $acquireTimeout = 3, float $expireTimeout = \INF) - { - parent::__construct($name, $acquireTimeout, $expireTimeout); - - $this->clients = $clients; - $this->logger = new NullLogger(); - } - - #[\Override] - protected function acquireWithToken(string $key, float $expireTimeout) - { - // 1. This differs from the specification to avoid an overflow on 32-Bit systems. - $startTs = microtime(true); - - // 2. - $acquired = 0; - $errored = 0; - $token = LockUtil::getInstance()->makeRandomToken(); - $exception = null; - foreach ($this->clients as $index => $client) { - try { - if ($this->add($client, $key, $token, $expireTimeout)) { - ++$acquired; - } - } catch (LockAcquireException $exception) { - // todo if there is only one redis server, throw immediately. - $context = [ - 'key' => $key, - 'index' => $index, - 'token' => $token, - 'exception' => $exception, - ]; - $this->logger->warning('Could not set {key} = {token} at server #{index}', $context); - - ++$errored; - } - } - - // 3. - $elapsedTime = microtime(true) - $startTs; - $isAcquired = $this->isMajority($acquired) && $elapsedTime <= $expireTimeout; - - if ($isAcquired) { - // 4. - return $token; - } - - // 5. - $this->releaseWithToken($key, $token); - - // In addition to RedLock it's an exception if too many servers fail. - if (!$this->isMajority(count($this->clients) - $errored)) { - assert($exception !== null); // The last exception for some context. - - throw new LockAcquireException( - 'It\'s not possible to acquire a lock because at least half of the Redis server are not available', - LockAcquireException::CODE_REDLOCK_NOT_ENOUGH_SERVERS, - $exception - ); - } - - return false; - } - - #[\Override] - protected function releaseWithToken(string $key, string $token): bool - { - /* - * All Redis commands must be analyzed before execution to determine which keys the command will operate on. In - * order for this to be true for EVAL, keys must be passed explicitly. - * - * @link https://redis.io/commands/set - */ - $script = <<<'EOD' - if redis.call("get", KEYS[1]) == ARGV[1] then - return redis.call("del", KEYS[1]) - else - return 0 - end - EOD; - $released = 0; - foreach ($this->clients as $index => $client) { - try { - if ($this->evalScript($client, $script, [$key], [$token])) { - ++$released; - } - } catch (LockReleaseException $e) { - // todo throw if there is only one redis server - $context = [ - 'key' => $key, - 'index' => $index, - 'token' => $token, - 'exception' => $e, - ]; - $this->logger->warning('Could not unset {key} = {token} at server #{index}', $context); - } - } - - return $this->isMajority($released); - } - - /** - * Returns if a count is the majority of all servers. - * - * @return bool True if the count is the majority - */ - private function isMajority(int $count): bool - { - return $count > count($this->clients) / 2; - } - - /** - * Sets the key only if such key doesn't exist at the server yet. - * - * @param TClient $client - * @param float $expire The TTL seconds - * - * @return bool True if the key was set - */ - abstract protected function add(object $client, string $key, string $value, float $expire): bool; - - /** - * @param TClient $client - * @param list $keys - * @param list $arguments - * - * @return mixed The script result, or false if executing failed - * - * @throws LockReleaseException An unexpected error happened - */ - abstract protected function evalScript(object $client, string $luaScript, array $keys, array $arguments); -} diff --git a/src/Mutex/DistributedMutex.php b/src/Mutex/DistributedMutex.php new file mode 100644 index 0000000..4da4b08 --- /dev/null +++ b/src/Mutex/DistributedMutex.php @@ -0,0 +1,182 @@ + */ + private array $mutexes; + + /** @var list */ + private ?array $lockedMutexIndexes = null; + + /** + * @param array $mutexes + * @param float $acquireTimeout In seconds + * @param float $expireTimeout In seconds + */ + public function __construct(array $mutexes, float $acquireTimeout = 3, float $expireTimeout = \INF) + { + parent::__construct('', $acquireTimeout, $expireTimeout); + + $this->mutexes = $mutexes; + $this->logger = new NullLogger(); + } + + #[\Override] + protected function acquireWithToken(string $key, float $expireTimeout) + { + $acquireTimeout = \Closure::bind(fn () => $this->acquireTimeout, $this, AbstractSpinlockMutex::class)(); + + // 1. This differs from the specification to avoid an overflow on 32-Bit systems. + $startTs = microtime(true); + + // 2. + $acquiredIndexes = []; + $errored = 0; + $exception = null; + foreach ($this->mutexes as $index => $mutex) { + try { + if ($this->acquireMutex($mutex, $key, $acquireTimeout - (microtime(true) - $startTs), $expireTimeout)) { + $acquiredIndexes[] = $index; + } + } catch (LockAcquireException $exception) { + $this->logger->warning('Could not set {key} = {token} at server #{index}', [ + 'key' => $key, + 'index' => $index, + 'exception' => $exception, + ]); + + ++$errored; + } + } + + // 3. + $elapsedTime = microtime(true) - $startTs; + $isAcquired = $this->isCountMajority(count($acquiredIndexes)) && $elapsedTime <= $expireTimeout; + + if ($isAcquired) { + $this->lockedMutexIndexes = $acquiredIndexes; + + // 4. + return LockUtil::getInstance()->makeRandomToken(); + } + + // 5. + foreach ($acquiredIndexes as $index) { + $this->releaseMutex($this->mutexes[$index], $key, $expireTimeout); + } + + // In addition to RedLock it's an exception if too many servers fail. + if (!$this->isCountMajority(count($this->mutexes) - $errored)) { + assert($exception !== null); // The last exception for some context. + + throw new LockAcquireException( + 'It is not possible to acquire a lock because at least half of the servers are not available', + LockAcquireException::CODE_REDLOCK_NOT_ENOUGH_SERVERS, + $exception + ); + } + + return false; + } + + #[\Override] + protected function releaseWithToken(string $key, string $token): bool + { + unset($token); + + $expireTimeout = \Closure::bind(fn () => $this->expireTimeout, $this, parent::class)(); + + try { + $released = 0; + foreach ($this->lockedMutexIndexes as $index) { + try { + if ($this->releaseMutex($this->mutexes[$index], $key, $expireTimeout)) { + ++$released; + } + } catch (LockReleaseException $e) { + $this->logger->warning('Could not unset {key} = {token} at server #{index}', [ + 'key' => $key, + 'index' => $index, + 'exception' => $e, + ]); + } + } + + return $this->isCountMajority($released); + } finally { + $this->lockedMutexIndexes = null; + } + } + + /** + * @return bool True if the count is the majority + */ + private function isCountMajority(int $count): bool + { + return $count > count($this->mutexes) / 2; + } + + /** + * @template T + * + * @param \Closure(): T $fx + * + * @return T + */ + private function executeMutexWithMinTimeouts(AbstractSpinlockMutex $mutex, \Closure $fx, float $acquireTimeout, float $expireTimeout) + { + if ($mutex instanceof AbstractSpinlockWithTokenMutex) { + return \Closure::bind(static function () use ($mutex, $fx, $expireTimeout) { + $origExpireTimeout = $mutex->expireTimeout; + if ($expireTimeout < $mutex->expireTimeout) { + $mutex->expireTimeout = $expireTimeout; + } + try { + return $fx(); + } finally { + $mutex->expireTimeout = $origExpireTimeout; + } + }, null, parent::class)(); + } + + return \Closure::bind(static function () use ($mutex, $fx, $acquireTimeout) { + $origAcquireTimeout = $mutex->acquireTimeout; + if ($acquireTimeout < $mutex->acquireTimeout) { + $mutex->acquireTimeout = $acquireTimeout; + } + try { + return $fx(); + } finally { + $mutex->acquireTimeout = $origAcquireTimeout; + } + }, null, AbstractSpinlockMutex::class)(); + } + + protected function acquireMutex(AbstractSpinlockMutex $mutex, string $key, float $acquireTimeout, float $expireTimeout): bool + { + return $this->executeMutexWithMinTimeouts($mutex, static fn () => $mutex->acquire($key), $acquireTimeout, $expireTimeout); + } + + protected function releaseMutex(AbstractSpinlockMutex $mutex, string $key, float $expireTimeout): bool + { + return $this->executeMutexWithMinTimeouts($mutex, static fn () => $mutex->release($key), \INF, $expireTimeout); + } +} diff --git a/src/Mutex/FlockMutex.php b/src/Mutex/FlockMutex.php index 6aef156..f04b309 100644 --- a/src/Mutex/FlockMutex.php +++ b/src/Mutex/FlockMutex.php @@ -74,11 +74,9 @@ private function lockPcntl(): void $timebox = new PcntlTimeout($acquireTimeoutInt); try { - $timebox->timeBoxed( - function (): void { - $this->lockBlocking(); - } - ); + $timebox->timeBoxed(function (): void { + $this->lockBlocking(); + }); } catch (DeadlineException $e) { throw LockAcquireTimeoutException::create($acquireTimeoutInt); } diff --git a/src/Mutex/RedisMutex.php b/src/Mutex/RedisMutex.php index b154ace..f15565d 100644 --- a/src/Mutex/RedisMutex.php +++ b/src/Mutex/RedisMutex.php @@ -11,30 +11,65 @@ use Predis\PredisException; /** - * Distributed mutex based on the Redlock algorithm supporting the phpredis extension and Predis API. + * Redis based spinlock implementation supporting the phpredis extension and Predis API. * * @phpstan-type TClient \Redis|\RedisCluster|PredisClientInterface - * - * @extends AbstractRedlockMutex - * - * @see https://redis.io/topics/distlock#the-redlock-algorithm */ -class RedisMutex extends AbstractRedlockMutex +class RedisMutex extends AbstractSpinlockWithTokenMutex { + /** @var TClient */ + private object $client; + /** - * @param TClient $client + * The Redis instance needs to be connected. I.e. Redis::connect() was called already. * - * @phpstan-assert-if-true \Redis|\RedisCluster $client + * @param TClient $client + * @param float $acquireTimeout In seconds + * @param float $expireTimeout In seconds */ - private function isClientPHPRedis(object $client): bool + public function __construct(object $client, string $name, float $acquireTimeout = 3, float $expireTimeout = \INF) { - $res = $client instanceof \Redis || $client instanceof \RedisCluster; + parent::__construct($name, $acquireTimeout, $expireTimeout); - \assert($res === !$client instanceof PredisClientInterface); + $this->client = $client; + } + + /** + * @phpstan-assert-if-true \Redis|\RedisCluster $this->client + */ + private function isClientPHPRedis(): bool + { + $res = $this->client instanceof \Redis || $this->client instanceof \RedisCluster; + + \assert($res === !$this->client instanceof PredisClientInterface); return $res; } + #[\Override] + protected function acquireWithToken(string $key, float $expireTimeout) + { + $token = LockUtil::getInstance()->makeRandomToken(); + + return $this->add($key, $token, $expireTimeout) + ? $token + : false; + } + + #[\Override] + protected function releaseWithToken(string $key, string $token): bool + { + $script = <<<'LUA' + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + LUA; + + return (int) $this->evalScript($script, [$key], [$token]) === 1; + } + private function makeRedisExpireTimeoutMillis(float $value): int { $res = LockUtil::getInstance()->castFloatToInt(ceil($value * 1000)); @@ -58,17 +93,20 @@ private function makeRedisExpireTimeoutMillis(float $value): int } /** + * Sets the key only if such key doesn't exist at the server yet. + * + * @return bool True if the key was set + * * @throws LockAcquireException */ - #[\Override] - protected function add(object $client, string $key, string $value, float $expire): bool + protected function add(string $key, string $value, float $expireTimeout): bool { - $expireTimeoutMillis = $this->makeRedisExpireTimeoutMillis($expire); + $expireTimeoutMillis = $this->makeRedisExpireTimeoutMillis($expireTimeout); - if ($this->isClientPHPRedis($client)) { + if ($this->isClientPHPRedis()) { try { // Will set the key, if it doesn't exist, with a ttl of $expire seconds - return $client->set($key, $value, ['nx', 'px' => $expireTimeoutMillis]); + return $this->client->set($key, $value, ['nx', 'px' => $expireTimeoutMillis]); } catch (\RedisException $e) { $message = sprintf( 'Failed to acquire lock for key \'%s\'', @@ -79,7 +117,7 @@ protected function add(object $client, string $key, string $value, float $expire } } else { try { - return $client->set($key, $value, 'PX', $expireTimeoutMillis, 'NX') !== null; + return $this->client->set($key, $value, 'PX', $expireTimeoutMillis, 'NX') !== null; } catch (PredisException $e) { $message = sprintf( 'Failed to acquire lock for key \'%s\'', @@ -91,24 +129,31 @@ protected function add(object $client, string $key, string $value, float $expire } } - #[\Override] - protected function evalScript(object $client, string $luaScript, array $keys, array $arguments) + /** + * @param list $keys + * @param list $arguments + * + * @return mixed The script result, or false if executing failed + * + * @throws LockReleaseException An unexpected error happened + */ + protected function evalScript(string $luaScript, array $keys, array $arguments) { - if ($this->isClientPHPRedis($client)) { - $arguments = array_map(function ($v) use ($client) { + if ($this->isClientPHPRedis()) { + $arguments = array_map(function ($v) { /* * If a serialization mode such as "php" or "igbinary" is enabled, the arguments must be * serialized by us, because phpredis does not do this for the eval command. * * The keys must not be serialized. */ - $v = $client->_serialize($v); + $v = $this->client->_serialize($v); /* * If LZF compression is enabled for the redis connection and the runtime has the LZF * extension installed, compress the arguments as the final step. */ - if ($this->isLzfCompressionEnabled($client)) { + if ($this->isLzfCompressionEnabled()) { $v = lzf_compress($v); } @@ -116,28 +161,25 @@ protected function evalScript(object $client, string $luaScript, array $keys, ar }, $arguments); try { - return $client->eval($luaScript, [...$keys, ...$arguments], count($keys)); + return $this->client->eval($luaScript, [...$keys, ...$arguments], count($keys)); } catch (\RedisException $e) { throw new LockReleaseException('Failed to release lock', 0, $e); } } else { try { - return $client->eval($luaScript, count($keys), ...$keys, ...$arguments); + return $this->client->eval($luaScript, count($keys), ...$keys, ...$arguments); } catch (PredisException $e) { throw new LockReleaseException('Failed to release lock', 0, $e); } } } - /** - * @param \Redis|\RedisCluster $client - */ - private function isLzfCompressionEnabled(object $client): bool + private function isLzfCompressionEnabled(): bool { if (!\defined('Redis::COMPRESSION_LZF')) { return false; } - return $client->getOption(\Redis::OPT_COMPRESSION) === \Redis::COMPRESSION_LZF; + return $this->client->getOption(\Redis::OPT_COMPRESSION) === \Redis::COMPRESSION_LZF; } } diff --git a/src/Mutex/SemaphoreMutex.php b/src/Mutex/SemaphoreMutex.php index a1f4bec..9feb240 100644 --- a/src/Mutex/SemaphoreMutex.php +++ b/src/Mutex/SemaphoreMutex.php @@ -29,8 +29,9 @@ class SemaphoreMutex extends AbstractLockMutex public function __construct($semaphore) { if (!$semaphore instanceof \SysvSemaphore && !is_resource($semaphore)) { - throw new \InvalidArgumentException('The semaphore id is not a valid resource'); + throw new \InvalidArgumentException('Invalid System V semaphore'); } + $this->semaphore = $semaphore; } @@ -38,7 +39,7 @@ public function __construct($semaphore) protected function lock(): void { if (!sem_acquire($this->semaphore)) { - throw new LockAcquireException('Failed to acquire the Semaphore'); + throw new LockAcquireException('Failed to acquire a System V semaphore'); } } @@ -46,7 +47,7 @@ protected function lock(): void protected function unlock(): void { if (!sem_release($this->semaphore)) { - throw new LockReleaseException('Failed to release the Semaphore'); + throw new LockReleaseException('Failed to release the System V semaphore'); } } } diff --git a/tests/Mutex/AbstractLockMutexTest.php b/tests/Mutex/AbstractLockMutexTest.php index f3ed7d3..50fcb5f 100644 --- a/tests/Mutex/AbstractLockMutexTest.php +++ b/tests/Mutex/AbstractLockMutexTest.php @@ -30,13 +30,12 @@ protected function setUp(): void */ public function testLockFails(): void { - $this->expectException(LockAcquireException::class); - $this->mutex->expects(self::once()) ->method('lock') ->willThrowException(new LockAcquireException()); - $this->mutex->synchronized(static function (): void { + $this->expectException(LockAcquireException::class); + $this->mutex->synchronized(static function () { self::fail(); }); } @@ -49,7 +48,7 @@ public function testUnlockAfterCode(): void $this->mutex->expects(self::once()) ->method('unlock'); - $this->mutex->synchronized(static function (): void {}); + $this->mutex->synchronized(static function () {}); } /** @@ -71,12 +70,11 @@ public function testUnlockAfterException(): void */ public function testUnlockFailsAfterCode(): void { - $this->expectException(LockReleaseException::class); - $this->mutex->expects(self::once()) ->method('unlock') ->willThrowException(new LockReleaseException()); + $this->expectException(LockReleaseException::class); $this->mutex->synchronized(static function () {}); } @@ -85,12 +83,11 @@ public function testUnlockFailsAfterCode(): void */ public function testUnlockFailsAfterException(): void { - $this->expectException(LockReleaseException::class); - $this->mutex->expects(self::once()) ->method('unlock') ->willThrowException(new LockReleaseException()); + $this->expectException(LockReleaseException::class); $this->mutex->synchronized(static function () { throw new \DomainException(); }); diff --git a/tests/Mutex/AbstractSpinlockMutexTest.php b/tests/Mutex/AbstractSpinlockMutexTest.php index 0e04e62..9faa11e 100644 --- a/tests/Mutex/AbstractSpinlockMutexTest.php +++ b/tests/Mutex/AbstractSpinlockMutexTest.php @@ -53,13 +53,12 @@ private function createSpinlockMutexMock(float $acquireTimeout = 3): AbstractSpi */ public function testFailAcquireLock(): void { - $this->expectException(LockAcquireException::class); - $mutex = $this->createSpinlockMutexMock(); $mutex->expects(self::any()) ->method('acquire') ->willThrowException(new LockAcquireException()); + $this->expectException(LockAcquireException::class); $mutex->synchronized(static function () { self::fail(); }); @@ -70,14 +69,13 @@ public function testFailAcquireLock(): void */ public function testAcquireTimeouts(): void { - $this->expectException(LockAcquireTimeoutException::class); - $this->expectExceptionMessage('Lock acquire timeout of 3.0 seconds has been exceeded'); - $mutex = $this->createSpinlockMutexMock(); $mutex->expects(self::atLeastOnce()) ->method('acquire') ->willReturn(false); + $this->expectException(LockAcquireTimeoutException::class); + $this->expectExceptionMessage('Lock acquire timeout of 3.0 seconds has been exceeded'); $mutex->synchronized(static function () { self::fail(); }); @@ -106,8 +104,6 @@ public function testExecuteBarelySucceeds(): void */ public function testFailReleasingLock(): void { - $this->expectException(LockReleaseException::class); - $mutex = $this->createSpinlockMutexMock(); $mutex->expects(self::any()) ->method('acquire') @@ -116,6 +112,7 @@ public function testFailReleasingLock(): void ->method('release') ->willReturn(false); + $this->expectException(LockReleaseException::class); $mutex->synchronized(static function () {}); } } diff --git a/tests/Mutex/AbstractSpinlockWithTokenMutexTest.php b/tests/Mutex/AbstractSpinlockWithTokenMutexTest.php index 38810c5..34cb69a 100644 --- a/tests/Mutex/AbstractSpinlockWithTokenMutexTest.php +++ b/tests/Mutex/AbstractSpinlockWithTokenMutexTest.php @@ -78,7 +78,6 @@ public function testExecuteTooLong(): void $this->expectException(ExecutionOutsideLockException::class); $this->expectExceptionMessageMatches('~^The code executed for 0\.2\d+ seconds\. But the expire timeout is 0\.2 seconds. The last 0\.0\d+ seconds were executed outside of the lock\.$~'); - $mutex->synchronized(static function () { usleep(201 * 1000); }); diff --git a/tests/Mutex/AbstractRedlockMutexTest.php b/tests/Mutex/DistributedMutexTest.php similarity index 54% rename from tests/Mutex/AbstractRedlockMutexTest.php rename to tests/Mutex/DistributedMutexTest.php index 807ee1d..97f83a3 100644 --- a/tests/Mutex/AbstractRedlockMutexTest.php +++ b/tests/Mutex/DistributedMutexTest.php @@ -8,7 +8,9 @@ use Malkusch\Lock\Exception\LockAcquireTimeoutException; use Malkusch\Lock\Exception\LockReleaseException; use Malkusch\Lock\Exception\MutexException; -use Malkusch\Lock\Mutex\AbstractRedlockMutex; +use Malkusch\Lock\Mutex\AbstractSpinlockMutex; +use Malkusch\Lock\Mutex\AbstractSpinlockWithTokenMutex; +use Malkusch\Lock\Mutex\DistributedMutex; use Malkusch\Lock\Util\LockUtil; use phpmock\environment\SleepEnvironmentBuilder; use phpmock\MockEnabledException; @@ -16,8 +18,10 @@ use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use Predis\PredisException; +use Psr\Log\LoggerInterface; -class AbstractRedlockMutexTest extends TestCase +class DistributedMutexTest extends TestCase { use PHPMock; @@ -43,25 +47,35 @@ protected function setUp(): void /** * @param int $count The amount of redis APIs * - * @return AbstractRedlockMutex&MockObject + * @return DistributedMutex&MockObject */ - private function createRedlockMutexMock(int $count, float $acquireTimeout = 1, float $expireTimeout = \INF): AbstractRedlockMutex + private function createDistributedMutexMock(int $count, float $acquireTimeout = 1, float $expireTimeout = \INF): DistributedMutex { - $clients = array_map( - static fn ($i) => new class($i) { - public int $i; - - public function __construct(int $i) - { - $this->i = $i; - } + $mutexes = array_map( + function (int $i) { + $mutex = $this->getMockBuilder(AbstractSpinlockWithTokenMutex::class) + ->setConstructorArgs(['test', \INF]) + ->onlyMethods(['acquireWithToken', 'releaseWithToken']) + ->getMock(); + + $mutex + ->method('acquireWithToken') + ->with(self::anything(), \INF) + ->willReturn('x' . $i); + + $mutex + ->method('releaseWithToken') + ->with(self::anything(), 'x' . $i) + ->willReturn(true); + + return $mutex; }, range(0, $count - 1) ); - return $this->getMockBuilder(AbstractRedlockMutex::class) - ->setConstructorArgs([$clients, 'test', $acquireTimeout, $expireTimeout]) - ->onlyMethods(['add', 'evalScript']) + return $this->getMockBuilder(DistributedMutex::class) + ->setConstructorArgs([$mutexes, $acquireTimeout, $expireTimeout]) + ->onlyMethods(['acquireMutex', 'releaseMutex']) ->getMock(); } @@ -76,27 +90,22 @@ public function __construct(int $i) #[DataProvider('provideMinorityCases')] public function testTooFewServerToAcquire(int $count, int $available): void { - $this->expectException(LockAcquireException::class); - $this->expectExceptionCode(MutexException::CODE_REDLOCK_NOT_ENOUGH_SERVERS); - - $mutex = $this->createRedlockMutexMock($count); + $mutex = $this->createDistributedMutexMock($count); $i = 0; $mutex->expects(self::exactly($count)) - ->method('add') - ->willReturnCallback( - static function () use (&$i, $available): bool { - if ($i < $available) { - ++$i; - - return true; - } - - throw new LockAcquireException(); + ->method('acquireMutex') + ->willReturnCallback(static function () use (&$i, $available) { + if ($i++ < $available) { + return true; } - ); - $mutex->synchronized(static function (): void { + throw new LockAcquireException(); + }); + + $this->expectException(LockAcquireException::class); + $this->expectExceptionCode(MutexException::CODE_REDLOCK_NOT_ENOUGH_SERVERS); + $mutex->synchronized(static function () { self::fail(); }); } @@ -112,25 +121,21 @@ static function () use (&$i, $available): bool { #[DataProvider('provideMajorityCases')] public function testFaultTolerance(int $count, int $available): void { - $mutex = $this->createRedlockMutexMock($count); - $mutex->expects(self::exactly($count)) - ->method('evalScript') + $mutex = $this->createDistributedMutexMock($count); + $mutex->expects(self::exactly($available)) + ->method('releaseMutex') ->willReturn(true); $i = 0; $mutex->expects(self::exactly($count)) - ->method('add') - ->willReturnCallback( - static function () use (&$i, $available): bool { - if ($i < $available) { - ++$i; - - return true; - } - - throw new LockAcquireException(); + ->method('acquireMutex') + ->willReturnCallback(static function () use (&$i, $available) { + if ($i++ < $available) { + return true; } - ); + + throw new LockAcquireException(); + }); $mutex->synchronized(static function () {}); } @@ -146,23 +151,18 @@ static function () use (&$i, $available): bool { #[DataProvider('provideMinorityCases')] public function testAcquireTooFewKeys(int $count, int $available): void { - $this->expectException(LockAcquireTimeoutException::class); - $this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded'); - - $mutex = $this->createRedlockMutexMock($count); + $mutex = $this->createDistributedMutexMock($count); $i = 0; $mutex->expects(self::any()) - ->method('add') - ->willReturnCallback( - static function () use (&$i, $available): bool { - ++$i; - - return $i <= $available; - } - ); + ->method('acquireMutex') + ->willReturnCallback(static function () use (&$i, $available) { + return ++$i <= $available; + }); - $mutex->synchronized(static function (): void { + $this->expectException(LockAcquireTimeoutException::class); + $this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded'); + $mutex->synchronized(static function () { self::fail(); }); } @@ -179,23 +179,22 @@ static function () use (&$i, $available): bool { #[DataProvider('provideAcquireTimeoutsCases')] public function testAcquireTimeouts(int $count, float $timeout, float $delay): void { - $this->expectException(LockAcquireTimeoutException::class); - $this->expectExceptionMessage('Lock acquire timeout of ' . LockUtil::getInstance()->formatTimeout($timeout) . ' seconds has been exceeded'); - - $mutex = $this->createRedlockMutexMock($count, $timeout, $timeout); + $mutex = $this->createDistributedMutexMock($count, $timeout, $timeout); $mutex->expects(self::exactly($count)) - ->method('evalScript') + ->method('releaseMutex') ->willReturn(true); $mutex->expects(self::exactly($count)) - ->method('add') - ->willReturnCallback(static function () use ($delay): bool { + ->method('acquireMutex') + ->willReturnCallback(static function () use ($delay) { usleep((int) ($delay * 1e6)); return true; }); - $mutex->synchronized(static function (): void { + $this->expectException(LockAcquireTimeoutException::class); + $this->expectExceptionMessage('Lock acquire timeout of ' . LockUtil::getInstance()->formatTimeout($timeout) . ' seconds has been exceeded'); + $mutex->synchronized(static function () { self::fail(); }); } @@ -220,23 +219,19 @@ public static function provideAcquireTimeoutsCases(): iterable #[DataProvider('provideMajorityCases')] public function testAcquireWithMajority(int $count, int $available): void { - $mutex = $this->createRedlockMutexMock($count); - $mutex->expects(self::exactly($count)) - ->method('evalScript') + $mutex = $this->createDistributedMutexMock($count); + $mutex->expects(self::exactly($available)) + ->method('releaseMutex') ->willReturn(true); $i = 0; $mutex->expects(self::exactly($count)) - ->method('add') - ->willReturnCallback( - static function () use (&$i, $available): bool { - ++$i; - - return $i <= $available; - } - ); + ->method('acquireMutex') + ->willReturnCallback(static function () use (&$i, $available) { + return ++$i <= $available; + }); - $mutex->synchronized(static function (): void {}); + $mutex->synchronized(static function () {}); } /** @@ -250,29 +245,24 @@ static function () use (&$i, $available): bool { #[DataProvider('provideMinorityCases')] public function testTooFewServersToRelease(int $count, int $available): void { - $mutex = $this->createRedlockMutexMock($count); + $mutex = $this->createDistributedMutexMock($count); $mutex->expects(self::exactly($count)) - ->method('add') + ->method('acquireMutex') ->willReturn(true); $i = 0; $mutex->expects(self::exactly($count)) - ->method('evalScript') - ->willReturnCallback( - static function () use (&$i, $available): bool { - if ($i < $available) { - ++$i; - - return true; - } - - throw new LockReleaseException(); + ->method('releaseMutex') + ->willReturnCallback(static function () use (&$i, $available) { + if ($i++ < $available) { + return true; } - ); - $this->expectException(LockReleaseException::class); + throw new LockReleaseException(); + }); - $mutex->synchronized(static function (): void {}); + $this->expectException(LockReleaseException::class); + $mutex->synchronized(static function () {}); } /** @@ -286,25 +276,20 @@ static function () use (&$i, $available): bool { #[DataProvider('provideMinorityCases')] public function testReleaseTooFewKeys(int $count, int $available): void { - $mutex = $this->createRedlockMutexMock($count); + $mutex = $this->createDistributedMutexMock($count); $mutex->expects(self::exactly($count)) - ->method('add') + ->method('acquireMutex') ->willReturn(true); $i = 0; $mutex->expects(self::exactly($count)) - ->method('evalScript') - ->willReturnCallback( - static function () use (&$i, $available): bool { - ++$i; - - return $i <= $available; - } - ); + ->method('releaseMutex') + ->willReturnCallback(static function () use (&$i, $available) { + return ++$i <= $available; + }); $this->expectException(LockReleaseException::class); - - $mutex->synchronized(static function (): void {}); + $mutex->synchronized(static function () {}); } /** @@ -337,4 +322,49 @@ public static function provideMajorityCases(): iterable yield [3, 3]; yield [4, 3]; } + + public function testAcquireMutexLogger(): void + { + $mutex = $this->createDistributedMutexMock(3); + $logger = $this->createMock(LoggerInterface::class); + $mutex->setLogger($logger); + + $mutex->expects(self::exactly(3)) + ->method('acquireMutex') + ->with(self::isInstanceOf(AbstractSpinlockMutex::class), 'php-malkusch-lock:', 1.0, \INF) + ->willThrowException($this->createMock(/* PredisException::class */ LockAcquireException::class)); + + $logger->expects(self::exactly(3)) + ->method('warning') + ->with('Could not set {key} = {token} at server #{index}', self::anything()); + + $this->expectException(LockAcquireException::class); + $this->expectExceptionMessage('It is not possible to acquire a lock because at least half of the servers are not available'); + $mutex->synchronized(static function () { + self::fail(); + }); + } + + public function testReleaseMutexLogger(): void + { + $mutex = $this->createDistributedMutexMock(3); + $logger = $this->createMock(LoggerInterface::class); + $mutex->setLogger($logger); + + $mutex->expects(self::exactly(3)) + ->method('acquireMutex') + ->willReturn(true); + + $mutex->expects(self::exactly(3)) + ->method('releaseMutex') + ->with(self::isInstanceOf(AbstractSpinlockMutex::class), 'php-malkusch-lock:', \INF) + ->willThrowException($this->createMock(/* PredisException::class */ LockReleaseException::class)); + + $logger->expects(self::exactly(3)) + ->method('warning') + ->with('Could not unset {key} = {token} at server #{index}', self::anything()); + + $this->expectException(LockReleaseException::class); + $mutex->synchronized(static function () {}); + } } diff --git a/tests/Mutex/FlockMutexTest.php b/tests/Mutex/FlockMutexTest.php index 63c7043..cd147d1 100644 --- a/tests/Mutex/FlockMutexTest.php +++ b/tests/Mutex/FlockMutexTest.php @@ -49,7 +49,7 @@ public function testCodeExecutedOutsideLockIsNotThrown(string $strategy): void { $this->mutex->strategy = $strategy; // @phpstan-ignore property.private - self::assertTrue($this->mutex->synchronized(static function (): bool { // @phpstan-ignore staticMethod.alreadyNarrowedType + self::assertTrue($this->mutex->synchronized(static function () { // @phpstan-ignore staticMethod.alreadyNarrowedType usleep(1100 * 1000); return true; @@ -64,22 +64,19 @@ public function testCodeExecutedOutsideLockIsNotThrown(string $strategy): void #[DataProvider('provideTimeoutableStrategiesCases')] public function testAcquireTimeoutOccurs(string $strategy): void { - $this->expectException(LockAcquireTimeoutException::class); - $this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded'); - - $another_resource = fopen($this->file, 'r'); - flock($another_resource, \LOCK_EX); + $anotherResource = fopen($this->file, 'r'); + flock($anotherResource, \LOCK_EX); $this->mutex->strategy = $strategy; // @phpstan-ignore property.private + $this->expectException(LockAcquireTimeoutException::class); + $this->expectExceptionMessage('Lock acquire timeout of 1.0 seconds has been exceeded'); try { - $this->mutex->synchronized( - static function () { - self::fail(); - } - ); + $this->mutex->synchronized(static function () { + self::fail(); + }); } finally { - fclose($another_resource); + fclose($anotherResource); } } @@ -101,16 +98,16 @@ public static function provideTimeoutableStrategiesCases(): iterable #[RequiresPhpExtension('pcntl')] public function testNoTimeoutWaitsForever(): void { - $this->expectException(DeadlineException::class); - - $another_resource = fopen($this->file, 'r'); - flock($another_resource, \LOCK_EX); + $anotherResource = fopen($this->file, 'r'); + flock($anotherResource, \LOCK_EX); $this->mutex->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_BLOCK, null, FlockMutex::class)(); // @phpstan-ignore property.private $timebox = new PcntlTimeout(1); + + $this->expectException(DeadlineException::class); $timebox->timeBoxed(function () { - $this->mutex->synchronized(static function (): void { + $this->mutex->synchronized(static function () { self::fail(); }); }); diff --git a/tests/Mutex/MemcachedMutexTest.php b/tests/Mutex/MemcachedMutexTest.php index c79e1b2..e836ed2 100644 --- a/tests/Mutex/MemcachedMutexTest.php +++ b/tests/Mutex/MemcachedMutexTest.php @@ -36,22 +36,19 @@ protected function setUp(): void public function testAcquireFail(): void { - $this->expectException(LockAcquireTimeoutException::class); - $this->memcached->expects(self::atLeastOnce()) ->method('add') ->with('php-malkusch-lock:test', true, 3) ->willReturn(false); - $this->mutex->synchronized(static function (): void { + $this->expectException(LockAcquireTimeoutException::class); + $this->mutex->synchronized(static function () { self::fail(); }); } public function testReleaseFail(): void { - $this->expectException(LockReleaseException::class); - $this->memcached->expects(self::once()) ->method('add') ->with('php-malkusch-lock:test', true, 3) @@ -62,7 +59,8 @@ public function testReleaseFail(): void ->with('php-malkusch-lock:test') ->willReturn(false); - $this->mutex->synchronized(static function (): void {}); + $this->expectException(LockReleaseException::class); + $this->mutex->synchronized(static function () {}); } public function testAcquireExpireTimeoutLimit(): void @@ -79,6 +77,6 @@ public function testAcquireExpireTimeoutLimit(): void ->with('php-malkusch-lock:test') ->willReturn(true); - $this->mutex->synchronized(static function (): void {}); + $this->mutex->synchronized(static function () {}); } } diff --git a/tests/Mutex/MutexConcurrencyTest.php b/tests/Mutex/MutexConcurrencyTest.php index 0477eda..76a39e9 100644 --- a/tests/Mutex/MutexConcurrencyTest.php +++ b/tests/Mutex/MutexConcurrencyTest.php @@ -5,6 +5,7 @@ namespace Malkusch\Lock\Tests\Mutex; use Eloquent\Liberator\Liberator; +use Malkusch\Lock\Mutex\DistributedMutex; use Malkusch\Lock\Mutex\FlockMutex; use Malkusch\Lock\Mutex\MemcachedMutex; use Malkusch\Lock\Mutex\Mutex; @@ -79,10 +80,10 @@ public function testHighContention(\Closure $code, \Closure $mutexFactory, ?\Clo $iterations = 1000 / $concurrency; $timeout = $concurrency * 20; - $this->fork($concurrency, static function () use ($mutexFactory, $timeout, $iterations, $code): void { + $this->fork($concurrency, static function () use ($mutexFactory, $timeout, $iterations, $code) { $mutex = $mutexFactory($timeout); for ($i = 0; $i < $iterations; ++$i) { - $mutex->synchronized(static function () use ($code): void { + $mutex->synchronized(static function () use ($code) { $code(1); }); } @@ -103,7 +104,7 @@ public static function provideHighContentionCases(): iterable static::$temporaryFiles[] = $filename; yield $name => [ - static function (int $increment) use ($filename): int { + static function (int $increment) use ($filename) { $counter = file_get_contents($filename); $counter += $increment; @@ -112,7 +113,7 @@ static function (int $increment) use ($filename): int { return $counter; }, $mutexFactory, - static function () use ($filename): void { + static function () use ($filename) { file_put_contents($filename, '0'); }, ]; @@ -131,9 +132,9 @@ public function testExecutionIsSerializedWhenLocked(\Closure $mutexFactory): voi { $time = \microtime(true); - $this->fork(6, static function () use ($mutexFactory): void { + $this->fork(6, static function () use ($mutexFactory) { $mutex = $mutexFactory(3); - $mutex->synchronized(static function (): void { + $mutex->synchronized(static function () { \usleep(200 * 1000); }); }); @@ -153,14 +154,14 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable self::$temporaryFiles[] = $filename; - yield 'flock' => [static function ($timeout) use ($filename): Mutex { + yield 'flock' => [static function ($timeout) use ($filename) { $file = fopen($filename, 'w'); return new FlockMutex($file, $timeout); }]; if (extension_loaded('pcntl')) { - yield 'flockWithTimoutPcntl' => [static function ($timeout) use ($filename): Mutex { + yield 'flockWithTimoutPcntl' => [static function ($timeout) use ($filename) { $file = fopen($filename, 'w'); $lock = Liberator::liberate(new FlockMutex($file, $timeout)); $lock->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_PCNTL, null, FlockMutex::class)(); // @phpstan-ignore property.notFound @@ -169,7 +170,7 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable }]; } - yield 'flockWithTimoutLoop' => [static function ($timeout) use ($filename): Mutex { + yield 'flockWithTimoutLoop' => [static function ($timeout) use ($filename) { $file = fopen($filename, 'w'); $lock = Liberator::liberate(new FlockMutex($file, $timeout)); $lock->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_LOOP, null, FlockMutex::class)(); // @phpstan-ignore property.notFound @@ -178,7 +179,7 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable }]; if (extension_loaded('sysvsem')) { - yield 'semaphore' => [static function () use ($filename): Mutex { + yield 'semaphore' => [static function () use ($filename) { $semaphore = sem_get(ftok($filename, 'b')); self::assertThat( $semaphore, @@ -193,7 +194,7 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable } if (getenv('MEMCACHE_HOST')) { - yield 'memcached' => [static function ($timeout): Mutex { + yield 'memcached' => [static function ($timeout) { $memcached = new \Memcached(); $memcached->addServer(getenv('MEMCACHE_HOST'), 11211); @@ -204,18 +205,23 @@ public static function provideExecutionIsSerializedWhenLockedCases(): iterable if (getenv('REDIS_URIS')) { $uris = explode(',', getenv('REDIS_URIS')); - yield 'RedisMutex /w Predis' => [static function ($timeout) use ($uris): Mutex { + yield 'DistributedMutex RedisMutex /w Predis' => [static function ($timeout) use ($uris) { $clients = array_map( static fn ($uri) => new PredisClient($uri), $uris ); - return new RedisMutex($clients, 'test', $timeout); + $mutexes = array_map( + static fn ($client) => new RedisMutex($client, 'test', $timeout), + $clients + ); + + return new DistributedMutex($mutexes, $timeout); }]; if (class_exists(\Redis::class)) { - yield 'RedisMutex /w PHPRedis' => [ - static function ($timeout) use ($uris): Mutex { + yield 'DistributedMutex RedisMutex /w PHPRedis' => [ + static function ($timeout) use ($uris) { $clients = array_map( static function (string $uri): \Redis { $redis = new \Redis(); @@ -235,14 +241,19 @@ static function (string $uri): \Redis { $uris ); - return new RedisMutex($clients, 'test', $timeout); + $mutexes = array_map( + static fn ($client) => new RedisMutex($client, 'test', $timeout), + $clients + ); + + return new DistributedMutex($mutexes, $timeout); }, ]; } } if (getenv('MYSQL_DSN')) { - yield 'MySQLMutex' => [static function ($timeout): Mutex { + yield 'MySQLMutex' => [static function ($timeout) { $pdo = new \PDO(getenv('MYSQL_DSN'), getenv('MYSQL_USER'), getenv('MYSQL_PASSWORD')); $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); @@ -251,7 +262,7 @@ static function (string $uri): \Redis { } if (getenv('PGSQL_DSN')) { - yield 'PostgreSQLMutex' => [static function (): Mutex { + yield 'PostgreSQLMutex' => [static function () { $pdo = new \PDO(getenv('PGSQL_DSN'), getenv('PGSQL_USER'), getenv('PGSQL_PASSWORD')); $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); diff --git a/tests/Mutex/MutexTest.php b/tests/Mutex/MutexTest.php index ca4eb18..84880d2 100644 --- a/tests/Mutex/MutexTest.php +++ b/tests/Mutex/MutexTest.php @@ -7,6 +7,7 @@ use Eloquent\Liberator\Liberator; use Malkusch\Lock\Mutex\AbstractLockMutex; use Malkusch\Lock\Mutex\AbstractSpinlockMutex; +use Malkusch\Lock\Mutex\DistributedMutex; use Malkusch\Lock\Mutex\FlockMutex; use Malkusch\Lock\Mutex\MemcachedMutex; use Malkusch\Lock\Mutex\Mutex; @@ -46,18 +47,18 @@ public static function setUpBeforeClass(): void */ public static function provideMutexFactoriesCases(): iterable { - yield 'NullMutex' => [static function (): Mutex { + yield 'NullMutex' => [static function () { return new NullMutex(); }]; - yield 'FlockMutex' => [static function (): Mutex { + yield 'FlockMutex' => [static function () { $file = fopen(vfsStream::url('test/lock'), 'w'); return new FlockMutex($file); }]; if (extension_loaded('pcntl')) { - yield 'flockWithTimoutPcntl' => [static function (): Mutex { + yield 'flockWithTimoutPcntl' => [static function () { $file = fopen(vfsStream::url('test/lock'), 'w'); $lock = Liberator::liberate(new FlockMutex($file, 3)); $lock->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_PCNTL, null, FlockMutex::class)(); // @phpstan-ignore property.notFound @@ -66,7 +67,7 @@ public static function provideMutexFactoriesCases(): iterable }]; } - yield 'flockWithTimoutLoop' => [static function (): Mutex { + yield 'flockWithTimoutLoop' => [static function () { $file = fopen(vfsStream::url('test/lock'), 'w'); $lock = Liberator::liberate(new FlockMutex($file, 3)); $lock->strategy = \Closure::bind(static fn () => FlockMutex::STRATEGY_LOOP, null, FlockMutex::class)(); // @phpstan-ignore property.notFound @@ -75,12 +76,12 @@ public static function provideMutexFactoriesCases(): iterable }]; if (extension_loaded('sysvsem')) { - yield 'SemaphoreMutex' => [static function (): Mutex { + yield 'SemaphoreMutex' => [static function () { return new SemaphoreMutex(sem_get(ftok(__FILE__, 'a'))); }]; } - yield 'AbstractLockMutex' => [static function (): Mutex { + yield 'AbstractLockMutex' => [static function () { $lock = new class extends AbstractLockMutex { #[\Override] protected function lock(): void {} @@ -92,7 +93,7 @@ protected function unlock(): void {} return $lock; }]; - yield 'AbstractSpinlockMutex' => [static function (): Mutex { + yield 'AbstractSpinlockMutex' => [static function () { $lock = new class('test') extends AbstractSpinlockMutex { #[\Override] protected function acquire(string $key): bool @@ -111,7 +112,7 @@ protected function release(string $key): bool }]; if (getenv('MEMCACHE_HOST')) { - yield 'MemcachedMutex' => [static function (): Mutex { + yield 'MemcachedMutex' => [static function () { $memcached = new \Memcached(); $memcached->addServer(getenv('MEMCACHE_HOST'), 11211); @@ -122,18 +123,23 @@ protected function release(string $key): bool if (getenv('REDIS_URIS')) { $uris = explode(',', getenv('REDIS_URIS')); - yield 'RedisMutex /w Predis' => [static function () use ($uris): Mutex { + yield 'DistributedMutex RedisMutex /w Predis' => [static function () use ($uris) { $clients = array_map( static fn ($uri) => new PredisClient($uri), $uris ); - return new RedisMutex($clients, 'test', self::TIMEOUT); + $mutexes = array_map( + static fn ($client) => new RedisMutex($client, 'test', self::TIMEOUT), + $clients + ); + + return new DistributedMutex($mutexes, self::TIMEOUT); }]; if (class_exists(\Redis::class)) { - yield 'RedisMutex /w PHPRedis' => [ - static function () use ($uris): Mutex { + yield 'DistributedMutex RedisMutex /w PHPRedis' => [ + static function () use ($uris) { $clients = array_map( static function ($uri) { $redis = new \Redis(); @@ -153,14 +159,19 @@ static function ($uri) { $uris ); - return new RedisMutex($clients, 'test', self::TIMEOUT); + $mutexes = array_map( + static fn ($client) => new RedisMutex($client, 'test', self::TIMEOUT), + $clients + ); + + return new DistributedMutex($mutexes, self::TIMEOUT); }, ]; } } if (getenv('MYSQL_DSN')) { - yield 'MySQLMutex' => [static function (): Mutex { + yield 'MySQLMutex' => [static function () { $pdo = new \PDO(getenv('MYSQL_DSN'), getenv('MYSQL_USER'), getenv('MYSQL_PASSWORD')); $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); @@ -169,7 +180,7 @@ static function ($uri) { } if (getenv('PGSQL_DSN')) { - yield 'PostgreSQLMutex' => [static function (): Mutex { + yield 'PostgreSQLMutex' => [static function () { $pdo = new \PDO(getenv('PGSQL_DSN'), getenv('PGSQL_USER'), getenv('PGSQL_PASSWORD')); $pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); @@ -189,7 +200,7 @@ static function ($uri) { public function testSynchronizedDelegates(\Closure $mutexFactory): void { $mutex = $mutexFactory(); - $result = $mutex->synchronized(static function (): string { + $result = $mutex->synchronized(static function () { return 'test'; }); self::assertSame('test', $result); @@ -224,9 +235,9 @@ public function testRelease(\Closure $mutexFactory): void #[DataProvider('provideMutexFactoriesCases')] public function testSynchronizedPassesExceptionThrough(\Closure $mutexFactory): void { - $this->expectException(\DomainException::class); - $mutex = $mutexFactory(); + + $this->expectException(\DomainException::class); $mutex->synchronized(static function () { throw new \DomainException(); }); diff --git a/tests/Mutex/PostgreSQLMutexTest.php b/tests/Mutex/PostgreSQLMutexTest.php index 2ebd43a..c2e0fc3 100644 --- a/tests/Mutex/PostgreSQLMutexTest.php +++ b/tests/Mutex/PostgreSQLMutexTest.php @@ -46,7 +46,7 @@ public function testAcquireLock(): void ->with(self::logicalAnd( new IsType(IsType::TYPE_ARRAY), self::countOf(2), - self::callback(function (...$arguments): bool { + self::callback(function (...$arguments) { if ($this->isPhpunit9x()) { // https://github.com/sebastianbergmann/phpunit/issues/5891 $arguments = $arguments[0]; } @@ -79,7 +79,7 @@ public function testReleaseLock(): void ->with(self::logicalAnd( new IsType(IsType::TYPE_ARRAY), self::countOf(2), - self::callback(function (...$arguments): bool { + self::callback(function (...$arguments) { if ($this->isPhpunit9x()) { // https://github.com/sebastianbergmann/phpunit/issues/5891 $arguments = $arguments[0]; } diff --git a/tests/Mutex/RedisMutexTest.php b/tests/Mutex/RedisMutexTest.php index eda1089..a49136b 100644 --- a/tests/Mutex/RedisMutexTest.php +++ b/tests/Mutex/RedisMutexTest.php @@ -7,10 +7,13 @@ use Malkusch\Lock\Exception\LockAcquireException; use Malkusch\Lock\Exception\LockReleaseException; use Malkusch\Lock\Exception\MutexException; +use Malkusch\Lock\Mutex\DistributedMutex; use Malkusch\Lock\Mutex\RedisMutex; use PHPUnit\Framework\Attributes\DataProvider; use PHPUnit\Framework\Attributes\RequiresPhpExtension; +use PHPUnit\Framework\Constraint\IsType; use PHPUnit\Framework\TestCase; +use Predis\ClientInterface as PredisClientInterface; if (\PHP_MAJOR_VERSION >= 8) { trait RedisCompatibilityTrait @@ -56,6 +59,19 @@ public function set($key, $value, $options = null) } } +interface PredisClientInterfaceWithSetAndEvalMethods extends PredisClientInterface +{ + /** + * @return mixed + */ + public function eval(); + + /** + * @return mixed + */ + public function set(); +} + /** * These tests require the environment variable: * @@ -90,14 +106,13 @@ protected function setUp(): void $connection = new class extends \Redis { use RedisCompatibilityTrait; - /** @var bool */ - private $is_closed = false; + private bool $isClosed = false; #[\Override] public function close(): bool { $res = parent::close(); - $this->is_closed = true; + $this->isClosed = true; return $res; } @@ -110,7 +125,7 @@ public function close(): bool */ private function _set(string $key, $value, $timeout = 0) { - if ($this->is_closed) { + if ($this->isClosed) { throw new \RedisException('Connection is closed'); } @@ -124,7 +139,7 @@ private function _set(string $key, $value, $timeout = 0) */ private function _eval(string $script, array $args = [], int $numKeys = 0) { - if ($this->is_closed) { + if ($this->isClosed) { throw new \RedisException('Connection is closed'); } @@ -146,7 +161,7 @@ private function _eval(string $script, array $args = [], int $numKeys = 0) $this->connections[] = $connection; } - $this->mutex = new RedisMutex($this->connections, 'test'); + $this->mutex = new DistributedMutex(array_map(static fn ($v) => new RedisMutex($v, 'test'), $this->connections)); // @phpstan-ignore assign.propertyType } #[\Override] @@ -185,12 +200,11 @@ private function closeMinorityConnections(): void public function testAddFails(): void { - $this->expectException(LockAcquireException::class); - $this->expectExceptionCode(MutexException::CODE_REDLOCK_NOT_ENOUGH_SERVERS); - $this->closeMajorityConnections(); - $this->mutex->synchronized(static function (): void { + $this->expectException(LockAcquireException::class); + $this->expectExceptionCode(MutexException::CODE_REDLOCK_NOT_ENOUGH_SERVERS); + $this->mutex->synchronized(static function () { self::fail(); }); } @@ -201,12 +215,30 @@ public function testAddFails(): void public function testEvalScriptFails(): void { $this->expectException(LockReleaseException::class); - - $this->mutex->synchronized(function (): void { + $this->mutex->synchronized(function () { $this->closeMajorityConnections(); }); } + public function testAcquireExpireTimeoutLimit(): void + { + $client = $this->createMock(PredisClientInterfaceWithSetAndEvalMethods::class); + + $this->mutex = new RedisMutex($client, 'test'); + + $client->expects(self::once()) + ->method('set') + ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 31_557_600_000_000, 'NX') + ->willReturnSelf(); + + $client->expects(self::once()) + ->method('eval') + ->with(self::anything(), 1, 'php-malkusch-lock:test', new IsType(IsType::TYPE_STRING)) + ->willReturn(true); + + $this->mutex->synchronized(static function () {}); + } + /** * @param \Redis::SERIALIZER_* $serializer * @param \Redis::COMPRESSION_* $compressor @@ -221,7 +253,7 @@ public function testSerializersAndCompressors(int $serializer, int $compressor): $connection->setOption(\Redis::OPT_COMPRESSION, $compressor); } - self::assertSame('test', $this->mutex->synchronized(static function (): string { + self::assertSame('test', $this->mutex->synchronized(static function () { return 'test'; })); } @@ -230,7 +262,7 @@ public function testResistantToPartialClusterFailuresForAcquiringLock(): void { $this->closeMinorityConnections(); - self::assertSame('test', $this->mutex->synchronized(static function (): string { + self::assertSame('test', $this->mutex->synchronized(static function () { return 'test'; })); } diff --git a/tests/Mutex/RedisMutexWithPredisTest.php b/tests/Mutex/RedisMutexWithPredisTest.php deleted file mode 100644 index a69e509..0000000 --- a/tests/Mutex/RedisMutexWithPredisTest.php +++ /dev/null @@ -1,166 +0,0 @@ -client = $this->createMock(PredisClientInterfaceWithSetAndEvalMethods::class); - - $this->mutex = new RedisMutex([$this->client], 'test', 2.5, 3.5); - - $this->logger = $this->createMock(LoggerInterface::class); - $this->mutex->setLogger($this->logger); - } - - /** - * Tests add() fails. - */ - public function testAddFailsToSetKey(): void - { - $this->client->expects(self::atLeastOnce()) - ->method('set') - ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 3501, 'NX') - ->willReturn(null); - - $this->logger->expects(self::never()) - ->method('warning'); - - $this->expectException(LockAcquireException::class); - - $this->mutex->synchronized( - static function (): void { - self::fail(); - } - ); - } - - /** - * Tests add() errors. - */ - public function testAddErrors(): void - { - $this->client->expects(self::atLeastOnce()) - ->method('set') - ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 3501, 'NX') - ->willThrowException($this->createMock(PredisException::class)); - - $this->logger->expects(self::once()) - ->method('warning') - ->with('Could not set {key} = {token} at server #{index}', self::anything()); - - $this->expectException(LockAcquireException::class); - - $this->mutex->synchronized( - static function () { - self::fail(); - } - ); - } - - public function testWorksNormally(): void - { - $this->client->expects(self::atLeastOnce()) - ->method('set') - ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 3501, 'NX') - ->willReturnSelf(); - - $this->client->expects(self::once()) - ->method('eval') - ->with(self::anything(), 1, 'php-malkusch-lock:test', new IsType(IsType::TYPE_STRING)) - ->willReturn(true); - - $executed = false; - - $this->mutex->synchronized(static function () use (&$executed): void { - $executed = true; - }); - - self::assertTrue($executed); - } - - public function testAcquireExpireTimeoutLimit(): void - { - $this->mutex = new RedisMutex([$this->client], 'test'); - - $this->client->expects(self::once()) - ->method('set') - ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 31_557_600_000_000, 'NX') - ->willReturnSelf(); - - $this->client->expects(self::once()) - ->method('eval') - ->with(self::anything(), 1, 'php-malkusch-lock:test', new IsType(IsType::TYPE_STRING)) - ->willReturn(true); - - $this->mutex->synchronized(static function (): void {}); - } - - /** - * Tests evalScript() fails. - */ - public function testEvalScriptFails(): void - { - $this->client->expects(self::atLeastOnce()) - ->method('set') - ->with('php-malkusch-lock:test', new IsType(IsType::TYPE_STRING), 'PX', 3501, 'NX') - ->willReturnSelf(); - - $this->client->expects(self::once()) - ->method('eval') - ->with(self::anything(), 1, 'php-malkusch-lock:test', new IsType(IsType::TYPE_STRING)) - ->willThrowException($this->createMock(PredisException::class)); - - $this->logger->expects(self::once()) - ->method('warning') - ->with('Could not unset {key} = {token} at server #{index}', self::anything()); - - $executed = false; - - $this->expectException(LockReleaseException::class); - - $this->mutex->synchronized(static function () use (&$executed): void { - $executed = true; - }); - - self::assertTrue($executed); - } -} diff --git a/tests/Util/DoubleCheckedLockingTest.php b/tests/Util/DoubleCheckedLockingTest.php index 2a848a9..491e9ff 100644 --- a/tests/Util/DoubleCheckedLockingTest.php +++ b/tests/Util/DoubleCheckedLockingTest.php @@ -28,11 +28,11 @@ public function testCheckFailsAcquiresNoLock(): void $this->mutex->expects(self::never()) ->method('synchronized'); - $checkedLocking = new DoubleCheckedLocking($this->mutex, static function (): bool { + $checkedLocking = new DoubleCheckedLocking($this->mutex, static function () { return false; }); - $result = $checkedLocking->then(static function (): void { + $result = $checkedLocking->then(static function () { self::fail(); }); @@ -54,7 +54,7 @@ public function testLockedCheckAndExecution(): void return $result; }); - $checkedLocking = new DoubleCheckedLocking($this->mutex, static function () use (&$lock, &$check): bool { + $checkedLocking = new DoubleCheckedLocking($this->mutex, static function () use (&$lock, &$check) { if ($check === 1) { self::assertSame(1, $lock); } @@ -84,12 +84,10 @@ public function testCodeNotExecuted(\Closure $check): void { $this->mutex->expects(self::any()) ->method('synchronized') - ->willReturnCallback(static function (\Closure $block) { - return $block(); - }); + ->willReturnCallback(static fn (\Closure $block) => $block()); $checkedLocking = new DoubleCheckedLocking($this->mutex, $check); - $result = $checkedLocking->then(static function (): void { + $result = $checkedLocking->then(static function () { self::fail(); }); @@ -101,12 +99,12 @@ public function testCodeNotExecuted(\Closure $check): void */ public static function provideCodeNotExecutedCases(): iterable { - yield 'failFirstCheck' => [static function (): bool { + yield 'failFirstCheck' => [static function () { return false; }]; $checkCounter = 0; - yield 'failSecondCheck' => [static function () use (&$checkCounter): bool { + yield 'failSecondCheck' => [static function () use (&$checkCounter) { return $checkCounter++ === 0; }]; } @@ -115,11 +113,9 @@ public function testCodeExecuted(): void { $this->mutex->expects(self::once()) ->method('synchronized') - ->willReturnCallback(static function (\Closure $block) { - return $block(); - }); + ->willReturnCallback(static fn (\Closure $block) => $block()); - $checkedLocking = new DoubleCheckedLocking($this->mutex, static function (): bool { + $checkedLocking = new DoubleCheckedLocking($this->mutex, static function () { return true; }); diff --git a/tests/Util/LoopTest.php b/tests/Util/LoopTest.php index 9fb020b..afeff11 100644 --- a/tests/Util/LoopTest.php +++ b/tests/Util/LoopTest.php @@ -47,7 +47,7 @@ public function testInvalidAcquireTimeout(float $acquireTimeout): void $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('The lock acquire timeout must be greater than or equal to 0.0 (' . LockUtil::getInstance()->formatTimeout($acquireTimeout) . ' was given)'); - $loop->execute(static function (): void { + $loop->execute(static function () { self::fail(); }, $acquireTimeout); } @@ -70,7 +70,7 @@ public static function provideInvalidAcquireTimeoutCases(): iterable public function testExecutionWithinAcquireTimeout(): void { $loop = new Loop(); - $loop->execute(static function () use ($loop): void { + $loop->execute(static function () use ($loop) { usleep(499 * 1000); $loop->end(); }, 0.5); @@ -82,7 +82,7 @@ public function testExecutionWithinAcquireTimeoutWithoutCallingEnd(): void $this->expectExceptionMessage('Lock acquire timeout of 0.5 seconds has been exceeded'); $loop = new Loop(); - $loop->execute(static function (): void { + $loop->execute(static function () { usleep(10 * 1000); }, 0.5); } @@ -94,7 +94,7 @@ public function testExecutionWithinAcquireTimeoutWithoutCallingEnd(): void public function testExceedAcquireTimeoutIsAcceptableIfEndWasCalled(): void { $loop = new Loop(); - $loop->execute(static function () use ($loop): void { + $loop->execute(static function () use ($loop) { usleep(501 * 1000); $loop->end(); }, 0.5); @@ -106,7 +106,7 @@ public function testExceedAcquireTimeoutWithoutCallingEnd(): void $this->expectExceptionMessage('Lock acquire timeout of 0.5 seconds has been exceeded'); $loop = new Loop(); - $loop->execute(static function (): void { + $loop->execute(static function () { usleep(501 * 1000); }, 0.5); } @@ -136,7 +136,7 @@ public function testEndCodeExecutedTwice(): void { $i = 0; $loop = new Loop(); - $loop->execute(static function () use ($loop, &$i): void { + $loop->execute(static function () use ($loop, &$i) { ++$i; if ($i > 1) { $loop->end();