Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"Symfony",
"Thich",
"Traceparent",
"ttls",
"tunlp",
"ucsplit",
"undot",
Expand Down
50 changes: 50 additions & 0 deletions src/lock/src/Driver/AbstractLock.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ abstract class AbstractLock implements LockInterface
*/
protected int $sleepMilliseconds = 250;

/**
* The timestamp when the lock was acquired.
*/
protected ?float $acquiredAt = null;

/**
* Create a new lock instance.
*/
Expand Down Expand Up @@ -131,6 +136,51 @@ public function isOwnedBy($owner): bool
return $this->getCurrentOwner() === $owner;
}

/**
* Refresh the lock expiration time.
* {@inheritdoc}
*/
#[Override]
abstract public function refresh(?int $ttl = null): bool;

/**
* Check if the lock has expired.
* {@inheritdoc}
*/
#[Override]
public function isExpired(): bool
{
if ($this->seconds <= 0) {
return false;
}

if ($this->acquiredAt === null) {
return true;
}

return microtime(true) >= ($this->acquiredAt + $this->seconds);
}

/**
* Get the remaining lifetime of the lock in seconds.
* {@inheritdoc}
*/
#[Override]
public function getRemainingLifetime(): ?float
{
if ($this->seconds <= 0) {
return null;
}

if ($this->acquiredAt === null) {
return null;
}

$remaining = ($this->acquiredAt + $this->seconds) - microtime(true);

return $remaining > 0 ? $remaining : 0.0;
}

/**
* Returns the owner value written into the driver for this lock.
*/
Expand Down
43 changes: 41 additions & 2 deletions src/lock/src/Driver/CacheLock.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ public function acquire(): bool
return false;
}

return $this->store->set($this->name, $this->owner, $this->seconds);
$result = $this->store->set($this->name, $this->owner, $this->seconds);

if ($result) {
$this->acquiredAt = microtime(true);
}

return $result;
}

/**
Expand All @@ -56,7 +62,13 @@ public function acquire(): bool
public function release(): bool
{
if ($this->isOwnedByCurrentProcess()) {
return $this->store->delete($this->name);
$result = $this->store->delete($this->name);

if ($result) {
$this->acquiredAt = null;
}

return $result;
}

return false;
Expand All @@ -69,6 +81,33 @@ public function release(): bool
public function forceRelease(): void
{
$this->store->delete($this->name);
$this->acquiredAt = null;
}

/**
* Refresh the lock expiration time.
*/
#[Override]
public function refresh(?int $ttl = null): bool
{
$ttl = $ttl ?? $this->seconds;

if ($ttl <= 0) {
return false;
}

if (! $this->isOwnedByCurrentProcess()) {
return false;
}

$result = $this->store->set($this->name, $this->owner, $ttl);

if ($result) {
$this->seconds = $ttl;
$this->acquiredAt = microtime(true);
}

return $result;
}

/**
Expand Down
135 changes: 129 additions & 6 deletions src/lock/src/Driver/CoroutineLock.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,44 @@
class CoroutineLock extends AbstractLock
{
/**
* Mapping of lock names to their corresponding channels.
*
* @var array<string, Channel>
*/
protected static array $channels = [];

/**
* Mapping of channels to their current owners (used for ownership verification).
*
* @var null|WeakMap<Channel, string>
*/
protected static ?WeakMap $owners = null;

/**
* Timer instance for scheduling lock expiration.
*/
protected static ?Timer $timer = null;

/**
* Mapping of channels to their timer IDs (for clearing expiration timers).
*
* @var null|WeakMap<Channel, int>
*/
protected static ?WeakMap $timerIds = null;

/**
* Mapping of channels to their acquisition timestamps (for tracking expiration).
*
* @var null|WeakMap<Channel, float>
*/
protected static ?WeakMap $acquiredTimes = null;

/**
* Mapping of channels to their TTL values (for tracking remaining lifetime).
*
* @var null|WeakMap<Channel, int>
*/
protected static ?WeakMap $timers = null;
protected static ?WeakMap $ttls = null;

/**
* Create a new lock instance.
Expand All @@ -51,8 +74,10 @@ public function __construct(
parent::__construct($name, $seconds, $owner);

self::$owners ??= new WeakMap();
self::$timers ??= new WeakMap();
self::$acquiredTimes ??= new WeakMap();
self::$ttls ??= new WeakMap();
self::$timer ??= new Timer();
self::$timerIds ??= new WeakMap();
}

/**
Expand All @@ -69,14 +94,17 @@ public function acquire(): bool
}

self::$owners[$chan] = $this->owner;
$this->acquiredAt = microtime(true);
self::$acquiredTimes[$chan] = $this->acquiredAt;
self::$ttls[$chan] = $this->seconds;

if ($timeId = self::$timers[$chan] ?? null) {
if ($timeId = self::$timerIds[$chan] ?? null) {
self::$timer?->clear((int) $timeId);
}

if ($this->seconds > 0) {
$timeId = self::$timer?->after($this->seconds * 1000, fn () => $this->forceRelease());
$timeId && self::$timers[$chan] = $timeId;
$timeId = self::$timer?->after($this->seconds, fn () => $this->forceRelease());
$timeId && self::$timerIds[$chan] = $timeId;
}
} catch (Throwable) {
return false;
Expand All @@ -92,7 +120,13 @@ public function acquire(): bool
public function release(): bool
{
if ($this->isOwnedByCurrentProcess()) {
return (self::$channels[$this->name] ?? null)?->pop(0.01) ? true : false;
$result = (self::$channels[$this->name] ?? null)?->pop(0.01) ? true : false;

if ($result) {
$this->acquiredAt = null;
}

return $result;
}

return false;
Expand All @@ -109,10 +143,99 @@ public function forceRelease(): void
}

self::$channels[$this->name] = null;
$this->acquiredAt = null;

$chan->close();
}

/**
* Refresh the lock expiration time.
*/
#[Override]
public function refresh(?int $ttl = null): bool
{
$ttl = $ttl ?? $this->seconds;

if ($ttl <= 0) {
return false;
}

if (! $this->isOwnedByCurrentProcess()) {
return false;
}

if (! $chan = self::$channels[$this->name] ?? null) {
return false;
}

// Clear existing timer
if ($timeId = self::$timerIds[$chan] ?? null) {
self::$timer?->clear((int) $timeId);
}

// Update TTL and acquired time
$this->seconds = $ttl;
$this->acquiredAt = microtime(true);
self::$acquiredTimes[$chan] = $this->acquiredAt;
self::$ttls[$chan] = $ttl;

// Set new timer
$timeId = self::$timer?->after($ttl, fn () => $this->forceRelease());
$timeId && self::$timerIds[$chan] = $timeId;

return true;
}

/**
* Check if the lock has expired.
*/
#[Override]
public function isExpired(): bool
{
if ($this->seconds <= 0) {
return false;
}

if (! $chan = self::$channels[$this->name] ?? null) {
return true;
}

$acquiredAt = self::$acquiredTimes[$chan] ?? null;
$ttl = self::$ttls[$chan] ?? $this->seconds;

if ($acquiredAt === null) {
return true;
}

return microtime(true) >= ($acquiredAt + $ttl);
}

/**
* Get the remaining lifetime of the lock in seconds.
*/
#[Override]
public function getRemainingLifetime(): ?float
{
if ($this->seconds <= 0) {
return null;
}

if (! $chan = self::$channels[$this->name] ?? null) {
return null;
}

$acquiredAt = self::$acquiredTimes[$chan] ?? null;
$ttl = self::$ttls[$chan] ?? $this->seconds;

if ($acquiredAt === null) {
return null;
}

$remaining = ($acquiredAt + $ttl) - microtime(true);

return $remaining > 0 ? $remaining : 0.0;
}

/**
* Returns the owner value written into the driver for this lock.
* @return string
Expand Down
35 changes: 35 additions & 0 deletions src/lock/src/Driver/DatabaseLock.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public function acquire(): bool
$acquired = $updated >= 1;
}

if ($acquired) {
$this->acquiredAt = microtime(true);
}

return $acquired;
}

Expand All @@ -83,6 +87,8 @@ public function release(): bool
->where('owner', $this->owner)
->delete();

$this->acquiredAt = null;

return true;
}

Expand All @@ -98,6 +104,35 @@ public function forceRelease(): void
$this->connection->table($this->table)
->where('key', $this->name)
->delete();
$this->acquiredAt = null;
}

/**
* Refresh the lock expiration time.
*/
#[Override]
public function refresh(?int $ttl = null): bool
{
$ttl = $ttl ?? $this->seconds;

if ($ttl <= 0) {
return false;
}

$updated = $this->connection->table($this->table)
->where('key', $this->name)
->where('owner', $this->owner)
->update([
'expiration' => time() + $ttl,
]);

if ($updated >= 1) {
$this->seconds = $ttl;
$this->acquiredAt = microtime(true);
return true;
}

return false;
}

/**
Expand Down
Loading