Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 158 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
A work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js
(TypeScript) and Dotnet (C#).

This provides no method of tracking the outcome of work items. This is fairly simple to implement
yourself (just store the result in the redis database with a key derived from the work item id). If
you want a more fully-featured system for managing jobs, see our [Collection
This provides no method of tracking the outcome of work items. Tracking results is fairly simple to
implement yourself (just store the result in the redis database with a key derived from the work
item id). If you want a more fully-featured system for managing jobs, see our [Collection
Manager](https://github.com/MeVitae/redis-collection-manager).

Implementations in other languages are welcome, open a PR!
Expand Down Expand Up @@ -43,8 +43,6 @@ All the implementations share the same operations, on the same core types, these
Items in the work queue consist of an `id`, a string, and some `data`, arbitrary bytes.

For convenience, the IDs are often randomly generated UUIDs, however they can be customized.
Another item with the same ID as a previous item shouldn't be added until the previous item has been
completed.

### Adding an item

Expand All @@ -57,6 +55,28 @@ Adding an item is exactly what it sounds like! It adds an item to the work queue
either be in the queue or being processed (before coming back to the queue if the processing fails)
until the job is completed.

#### Adding known unique items (faster)

*Python: `WorkQueue.add_unique_item`,
Rust: [`WorkQueue::add_unique_item`](https://docs.rs/redis-work-queue/latest/redis_work_queue/struct.WorkQueue.html#method.add_unique_item),
Node.js: [`WorkQueue::add_item`](WorkQueue-addItem),
Go: [`WorkQueue.AddUniqueItem`](https://pkg.go.dev/github.com/mevitae/redis-work-queue/go#WorkQueue.AddUniqueItem)*

If you know that an item ID is not already in the queue, you can instead use an optimised
`add_unique_item` method, which skips that exact check.

*If you use this incorrectly, nothing will go too badly wrong, but the reported queue length, which
may be used for autoscaling, will be inaccurate, and leasing items will take multiple iterations.*

#### Using the right add method

The default item constructors set the item ID to a randomly generated UUID (universally *unique*
ID). If this is used, then the `add_unique_item` method should be preferred.

However, if duplicate jobs are likely to be added, then the item IDs should be set such that equal
jobs have equal IDs (for example by using a hash of the job), and then the `add_item` method should
be used, to prevent jobs from being duplicated.

### Leasing an item

*Python: `WorkQueue.lease`,
Expand All @@ -82,7 +102,7 @@ successfully working, a job will always be run to completion (even if it is run
that process).

If you're unhappy about jobs being run more than once, see [But I never want my job to run more than
once](#).
once](#but-i-never-want-my-job-to-run-more-than-once).

#### Storing the result of a work item

Expand Down Expand Up @@ -152,38 +172,38 @@ complete will return `true` for only one worker.

#### Storing the result

See [Storing the result of a work item](#)
See [Storing the result of a work item](#storing-the-result-of-a-work-item)

### Cleaning

When workers fail to complete items, or if they fail in the middle of redis operations, they can
leave the queue in a state that requires cleaning to ensure items are completed. Because of this,
cleans should occur periodically.

The frequency and schedule of these is entirely up to you. Light cleans are quick, and can be
carried out regularly. Deep cleans get get very slow depending on the size of your queues, and so
should be performed less often, but should be performed to clean up cases where workers or cleaners
have unexpectedly terminated in the middle of redis operations.

#### Light cleaning

*Python: `WorkQueue.light_clean`, Rust implementation planned, no Go or C# implementation planned*

When a worker dies while processing a job, or abandons a job, the job is left in the processing
state until it expires. The role of *light cleaning* is to move these jobs back to the main work
queue so another worker can pick them up.

The interval *light cleaning* should be run on should be approximately equal to the shortest lease
time you use.

#### Deep cleaning

*Python and Rust implementations planned, no Go or C# implementation planned*

In addition to this, a worker dying in the middle of a call to `complete` can leave database items
that are no longer associated with an active job. The job of a *deep clean* is to iterate over these
keys and make sure the database is clean.

It's very rare that deep cleaning is needed, but it can happen if you get really unlucky, so it
should be run automatically but infrequently.

The cleaning process we provide runs this every 6 hours by default.
should be run automatically but less frequently, depending on your requirements for guaranteed
completion times.

#### Cleaning process

When there are many workers of different types, it's simpler just to have a dedicated process
running the cleaning. We provide a simple cleaner, both in Python and Rust.
running the cleaning.

### Other operations

Expand All @@ -202,9 +222,127 @@ Node.js: [`WorkQueue.processing`](WorkQueue.processing),
Go: [`WorkQueue.QueueLen`](https://pkg.go.dev/github.com/mevitae/redis-work-queue/go#WorkQueue.Processing)*

This includes items being worked on and abandoned items (see [Handling errors](#handling-errors)) yet to be
returned to the main queue.
returned to the *main queue*.

## Testing

The client implementations each have their own (very simple) unit tests. Most of the testing is done
through the integrations tests, located in the [tests](./tests/) directory.

## Technical details

A queue is identified by its key prefix.

Each queue item has a unique ID, and has its own *data key*, which is `<prefix>:item:<item_id>`. The
item data is stored with this key. If this key exists, then the item is considered incomplete. If
the item data key does not exist, the item is considered completed.

The work queue then has a pair of lists, the *main queue* (`<prefix>:queue`) and *processing queue*
(`<prefix>:processing`), to track these items. However, if an item ends up in none of these queues
(which can happen if operations aren't properly completed), it is still considered an item. The
*deep clean* process fixes cases like this periodically.

An item in a queue list, but without a data key, isn't considered an item, so should be ignored and
removed from the queues when it's encountered.

More specifically, the *main queue* holds the list of item IDs which are yet to be processed. New
items are pushed to the left of the list, and leased items are popped from the right.

### Adding an item

To add an item, you must:

- Store the item data to the item data key (`<prefix>:item:<item_id>`), then
- Push the item to the left of the *main queue*.
- The push should be done second, to prevent waiting workers from popping the item immediately,
before the data key is set.

If the item's data key already exists, you shouldn't push it to the *main queue* again, since this
will cause it to be counted twice when getting the queue length, which can have negative impact on
queue-length based autoscaling. Furthermore, when the item is completed, it's copy will still be in
the *main queue*, so leasing will take more iterations.

If the ID is already known to be unique (for example UUIDs), you can safely pipeline these
operations and skip the check.

### Leasing an item

The fetch an item to work on, a worker should pop from the right of the *main queue*, and push to the
left of the *processing queue* (`rpoplpush`).

The *processing queue* is a method used to track the items currently being processed, to make the
cleaning process more efficient.

Furthermore, while an item is being processed, it has a *lease key*, which is
`<prefix>:lease:<item_id>`. The value of this is the session ID of the worker which got the lease.
Lease keys are set with an expiry, once the key has expired (or is otherwise deleted), the session
is deemed to have failed working on the item, and the item will be added, by the cleaning process,
to the end of the *main queue*.

The lease function should therefore:

- `rpoplpush` from the *main queue* to the processing queue,
- Load the data for this item
- and, if it doesn't exist, ignore the item,
- *(in our provided client implementations, if lease is requested to block, and there's no
timeout, the lease method just tries to get the next item. In any other case, the method returns
with no item)*, then
- set the *lease key*, with an expiry time longer than the expected job duration.

### Completing an item

When completing an item, you must:
- Remove the data key.

You should also:
- Remove the *lease key*, and
- Remove the item ID from the *processing queue*.

The item is considered to be removed exactly when the data key is removed, but the other steps keep
things tidy (without removing the lease, it would eventually expire, and the cleaning process would
later remove the item ID from the processing queue anyway).

The completion methods also return a boolean, for which only one remove call must return true. This
boolean can be decided by the output of the command to delete the data key. If it deletes the item,
this is the call that completed the item. Otherwise, another worker has already completed it.

### Cleaning

Items are considered items only when their data key exists.

If a lease does not exist for the item, then processing must have failed before the item was
completed, and the item should be available again for a lease.

The cleaning process finds items:

- That exist,
- Aren't in the *main queue*, and
- Don't have a lease.

And then:

- Removes them from the processing queue, and
- Pushes them to the *main queue*.

The item should be removed from the processing queue first.

#### Deep clean

The deep cleaning process is complete. It uses `keys` to enumerate all the data items for checking.

#### Light clean

Using `keys` can cause significant performance issues, so should ideally be avoided.

This is why we have the *processing queue*. So long as all operations complete fully, any item with
an expired lease will be in the processing queue, we can therefore follow the usual cleaning
algorithm, but instead only use the item IDs from the *processing queue*.

#### Cleaning schedule

This is entirely up to you. Light cleans are quick, and can be carried out regularly. Deep cleans
get get very slow depending on the size of your queues, and so should be performed less often, but
should be performed to clean up cases where workers or cleaners have unexpectedly terminated in the
middle of redis operations.

*Light cleaning* should be run on should be approximately equal to the shortest lease time you use.
Binary file modified dotnet/RedisWorkQueue.pdf
Binary file not shown.
42 changes: 4 additions & 38 deletions dotnet/RedisWorkQueue/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,12 @@ readme](https://github.com/MeVitae/redis-work-queue/blob/main/README.md).

## Documentation

Below is a brief overview and an example. More details on the core concepts can be found in the
[readme](https://github.com/MeVitae/redis-work-queue/blob/main/README.md), and full API
documentation can be found in
Below is a brief example. More details on the core concepts can be found in the
[readme](https://github.com/MeVitae/redis-work-queue/blob/main/README.md), and
full API documentation can be found in
[../RedisWorkQueue.pdf](https://github.com/MeVitae/redis-work-queue/blob/main/dotnet/RedisWorkQueue.pdf).

## WorkQueue

The `WorkQueue` class represents a work queue backed by a Redis database. It provides methods to add
items to the queue, lease items from the queue, and mark completed items as done.

### Properties

- `Session`: Gets or sets the unique identifier for the current session.

### Methods

- `AddItem(IRedisClient db, Item item)`: Adds an item to the work queue. The `db` parameter is the
Redis instance and the `item` parameter is the item to be added.

- `QueueLength(IRedisClient db)`: Gets the length of the main queue. The `db` parameter is the Redis
instance.

- `Processing(IRedisClient db)`: Gets the length of the processing queue. The `db` parameter is the
Redis instance.

- `LeaseExists(IRedisClient db, string itemId)`: Checks if a lease exists for the specified item ID.
The `db` parameter is the Redis instance and the `itemId` parameter is the ID of the item to
check.

- `Lease(IRedisClient db, int leaseSeconds, bool block, int timeout = 0)`: Requests a work lease
from the work queue. This should be called by a worker to get work to complete. The `db` parameter
is the Redis instance, the `leaseSeconds` parameter is the number of seconds to lease the item for,
the `block` parameter indicates whether to block and wait for an item to be available if the main
queue is empty, and the `timeout` parameter is the maximum time to block in seconds.

- `Complete(IRedisClient db, Item item)`: Marks a job as completed and removes it from the work
queue. The `db` parameter is the Redis instance and the `item` parameter is the item to be
completed.

### Example Usage
## Example Usage

```csharp
using FreeRedis;
Expand Down
6 changes: 3 additions & 3 deletions dotnet/RedisWorkQueue/RedisWorkQueue.csproj
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<TargetFrameworks>net6.0;net7.0;net8.0</TargetFrameworks>
<Nullable>enable</Nullable>
<PackageId>MeVitae.RedisWorkQueue</PackageId>
<Version>0.2.1</Version>
<Version>0.3.0</Version>
<Authors>Jacob O'Toole, Nathan Lamplough, Ilie Mihai Alexandru</Authors>
<Company>MeVitae</Company>
<RepositoryUrl>https://github.com/MeVitae/redis-work-queue</RepositoryUrl>
Expand All @@ -22,7 +22,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="FreeRedis" Version="1.2.5" />
<PackageReference Include="FreeRedis" Version="1.3.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>

Expand Down
Loading