Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dotnet/RedisWorkQueue/RedisWorkQueue.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<TargetFrameworks>net6.0</TargetFrameworks>
<Nullable>enable</Nullable>
<PackageId>MeVitae.RedisWorkQueue</PackageId>
<Version>0.2.1</Version>
Expand Down
60 changes: 59 additions & 1 deletion dotnet/RedisWorkQueue/WorkQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class WorkQueue
/// </summary>
private KeyPrefix ItemDataKey { get; set; }

private string CleaningKey { get; set; }

/// <summary>
/// Creates a new instance of the WorkQueue class with based on name given name.
/// </summary>
Expand All @@ -46,6 +48,7 @@ public WorkQueue(KeyPrefix name)
this.ProcessingKey = name.Of(":processing");
this.LeaseKey = name.Concat(":leased_by_session:");
this.ItemDataKey = name.Concat(":item:");
this.CleaningKey = name.Of(":cleaning");
}

/// <summary>
Expand Down Expand Up @@ -84,6 +87,17 @@ public long Processing(IRedisClient db)
return db.LLen(ProcessingKey);
}

/// <summary>
/// Checks if data exists for the specified item ID.
/// </summary>
/// <param name="db">The Redis client instance.</param>
/// <param name="itemId">The ID of the item to check.</param>
/// <returns>True if data exists, false otherwise.</returns>
public bool DataExists(IRedisClient db, string itemId)
{
return db.Exists(ItemDataKey.Of(itemId));
}

/// <summary>
/// Checks if a lease exists for the specified item ID.
/// </summary>
Expand Down Expand Up @@ -135,7 +149,7 @@ public bool LeaseExists(IRedisClient db, string itemId)

var data = db.Get<byte[]>(ItemDataKey.Of(itemId));
if (data == null)
data = new byte[0];
return null;

db.SetEx(LeaseKey.Of(itemId), leaseSeconds, Encoding.UTF8.GetBytes(Session));

Expand Down Expand Up @@ -167,5 +181,49 @@ public bool Complete(IRedisClient db, Item item)

return true;
}


/// <summary>
/// Moves abandoned/stuck in processing jobs back to the main work queue
/// First collects items in the processing queue
/// Then re adds items without a lease (due to expiry or never created) to the main queue and cleaning queue
/// Next we do the same for items in cleaning queue, this handles cases when the cleaner crashes
/// </summary>
/// <param name="db">The Redis client instance</param>
public void LightClean(IRedisClient db)
{
var processing = db.LRange(ProcessingKey, 0, -1);
foreach (var item_id in processing)
{
if (!LeaseExists(db, item_id))
{
Console.WriteLine($"{item_id} has no lease");
db.LPush(CleaningKey, item_id);
var removed = db.LRem(ProcessingKey, 0, item_id);
if (removed > 0)
{
db.LPush(MainQueueKey, item_id);
Console.WriteLine($"{item_id} was still in the processing queue, it was reset");
}
else
Console.WriteLine($"{item_id} was no longer in the processing queue");
db.LRem(CleaningKey, 0, item_id);
}
}

var forgot = db.LRange(CleaningKey, 0, -1);
foreach (var item_id in forgot)
{
Console.WriteLine($"{item_id} was forgotten in clean");
//FreeRedis LPos always returns a long
//need to confirm if -1 is returned in place of NIL
if (DataExists(db, item_id) && db.LPos(MainQueueKey, item_id) < 0 && db.LPos(ProcessingKey, item_id) < 0)
Comment on lines +218 to +220
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect LPos might return 0 if the response is nil... Which makes it impossible to use this method to determine if the item is in the queue...

The tests are failing (for a simple case that only requires Python and dotnet, and uses the dotnet cleaning, from within the tests directory, run ./run-test.sh -t python_jobs,dotnet_jobs -c ./dotnet-cleaner/run.sh) and I think this is the reason it fails 😞

{
db.LPush(MainQueueKey, item_id);
Console.WriteLine($"{item_id} was not in any queue, it was reset");
}
db.LRem(CleaningKey, 0, item_id);
}
}
}
}
3 changes: 3 additions & 0 deletions go/WorkQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func (workQueue *WorkQueue) Lease(
// Get the item's data
data, err := db.Get(ctx, workQueue.itemDataKey.Of(itemId)).Bytes()
if err != nil {
if err == redis.Nil {
return nil, nil
}
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions node/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 15 additions & 6 deletions node/src/WorkQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,25 @@ export class WorkQueue {
}

/**
* This method can be used to check if a Lease Exists or not for a itemId.
* This method can be used to check if data exists or not for a itemId.
*
* @param {Redis} db The Redis Connection.
* @param {string} itemId The itemId of the item you want to check if it has data.
* @returns {Promise<boolean>}
*/
async dataExists(db: Redis, itemId: string): Promise<boolean> {
return await db.exists(this.itemDataKey.of(itemId)) !== 0;
}

/**
* This method can be used to check if a lease exists or not for a itemId.
*
* @param {Redis} db The Redis Connection.
* @param {string} itemId The itemId of the item you want to check if it has a lease.
* @returns {Promise<boolean>}
*/
async leaseExists(db: Redis, itemId: string): Promise<boolean> {
const exists = await db.exists(this.leaseKey.of(itemId));
return exists !== 0;
return await db.exists(this.leaseKey.of(itemId)) !== 0;
}

/**
Expand Down Expand Up @@ -141,7 +151,7 @@ export class WorkQueue {
let data: Buffer | null = await db.getBuffer(this.itemDataKey.of(itemId));

if (data == null) {
data = Buffer.alloc(0);
return null;
}

// Setup the lease item.
Expand Down Expand Up @@ -178,9 +188,8 @@ export class WorkQueue {

const forgot: Array<string> = await db.lrange(this.cleaningKey, 0, -1);
for (const itemId of forgot) {
const leaseExists: boolean = await this.leaseExists(db, itemId);
if (
!leaseExists &&
(await this.dataExists(db, itemId)) &&
(await db.lpos(this.mainQueueKey, itemId)) == null &&
(await db.lpos(this.processingKey, itemId)) == null
) {
Expand Down
8 changes: 6 additions & 2 deletions python/redis_work_queue/workqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def light_clean(self, db: Redis) -> None:
if isinstance(item_id, bytes):
item_id = item_id.decode('utf-8')
print(item_id, 'was forgotten in clean')
if not self._lease_exists(db, item_id) and \
if self._data_exists(db, item_id) and \
db.lpos(self._main_queue_key, item_id) is None and \
db.lpos(self._processing_key, item_id) is None:
# FIXME: this introcudes a race
Expand All @@ -85,6 +85,10 @@ def light_clean(self, db: Redis) -> None:
print(item_id, 'was not in any queue, it was reset')
db.lrem(self._cleaning_key, 0, item_id)

def _data_exists(self, db: Redis, item_id: str) -> bool:
"""True iff a lease on 'item_id' exists."""
return db.exists(self._item_data_key.of(item_id)) != 0

def _lease_exists(self, db: Redis, item_id: str) -> bool:
"""True iff a lease on 'item_id' exists."""
return db.exists(self._lease_key.of(item_id)) != 0
Expand Down Expand Up @@ -129,7 +133,7 @@ def lease(self, db: Redis, lease_secs: int, block=True, timeout=0) -> Item | Non
# If we got an item, fetch the associated data.
data: bytes | None = db.get(self._item_data_key.of(item_id))
if data is None:
data = bytes()
return None

# Setup the lease item.
# NOTE: Racing for a lease is ok.
Expand Down
9 changes: 6 additions & 3 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,13 @@ impl WorkQueue {
// If we got an item, fetch the associated data.
let item = match item_id {
Some(item_id) => Item {
data: db
.get::<_, Vec<u8>>(self.item_data_key.of(&item_id))
data: match db
.get::<_, Option<Vec<u8>>>(self.item_data_key.of(&item_id))
.await?
.into_boxed_slice(),
{
Some(data) => data.into_boxed_slice(),
None => return Ok(None),
},
id: item_id,
},
None => return Ok(None),
Expand Down
13 changes: 12 additions & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ From the `tests` directory, to run the integration tests with all languages, use
./run-test.sh -t go_jobs,python_jobs,rust_jobs,node_jobs,dotnet_jobs
```

To do the same, but wit the DotNet implementation of the cleaner, use:

```bash
./run-test.sh -t go_jobs,python_jobs,rust_jobs,node_jobs,dotnet_jobs -c ./dotnet-cleaner/run.sh
```

For a summary of other options, run:

```bash
Expand All @@ -38,14 +44,19 @@ For example:

##### --host

This can be used to set a specific redis server, the default is `localhost`.
This can be used to set a specific redis server, the default is `localhost:6379`.

For example:

```bash
./run-test.sh --tests "go_jobs,python_jobs" --host example.server.net:port
```

##### --cleaner

This sets a custom binary to be used to clean the work queues, see the docs in
[job-spawner-and-cleaner.py](./job-spawner-and-cleaner.py).

## Unit tests also exist

Each client implementation contains some unit tests. These are located within the implementations
Expand Down
18 changes: 18 additions & 0 deletions tests/dotnet-cleaner/Cleaner/Cleaner.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="FreeRedis" Version="1.2.5" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include = "../../../dotnet/RedisWorkQueue/RedisWorkQueue.csproj" />
</ItemGroup>

</Project>
18 changes: 18 additions & 0 deletions tests/dotnet-cleaner/Cleaner/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using RedisWorkQueue;
using FreeRedis;

class Program
{
public static void Main(string[] args)
{
RedisClient db = new RedisClient(args[0]);
for (; ; )
{
string? queueName = Console.ReadLine();
if (queueName == null) break;
if (queueName == "") throw new Exception("input line must be non-empty");
new WorkQueue(new KeyPrefix(queueName)).LightClean(db);
Console.WriteLine("cleaned " + queueName);
}
}
}
3 changes: 3 additions & 0 deletions tests/dotnet-cleaner/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
cd "$(realpath "$(dirname $0)")"/Cleaner
exec dotnet run -v quiet "$@"
4 changes: 2 additions & 2 deletions tests/dotnet/RedisWorkQueueTests/RedisWorkQueueTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include = "../../../dotnet/RedisWorkQueue/RedisWorkQueue.csproj" />
</ItemGroup>
<ProjectReference Include = "../../../dotnet/RedisWorkQueue/RedisWorkQueue.csproj" />
</ItemGroup>

</Project>
4 changes: 2 additions & 2 deletions tests/go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ go 1.20

require (
github.com/mevitae/redis-work-queue/go v0.1.0
github.com/redis/go-redis/v9 v9.0.2
github.com/redis/go-redis/v9 v9.3.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.4.0 // indirect
)
16 changes: 6 additions & 10 deletions tests/go/go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
Loading