diff --git a/README.md b/README.md index 65d3284..ca2d06a 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,9 @@ A work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js (TypeScript) and Dotnet (C#). -This provides no method of tracking the outcome of work items. This is fairly simple to implement -yourself (just store the result in the redis database with a key derived from the work item id). If -you want a more fully-featured system for managing jobs, see our [Collection +This provides no method of tracking the outcome of work items. Tracking results is fairly simple to +implement yourself (just store the result in the redis database with a key derived from the work +item id). If you want a more fully-featured system for managing jobs, see our [Collection Manager](https://github.com/MeVitae/redis-collection-manager). Implementations in other languages are welcome, open a PR! @@ -43,8 +43,6 @@ All the implementations share the same operations, on the same core types, these Items in the work queue consist of an `id`, a string, and some `data`, arbitrary bytes. For convenience, the IDs are often randomly generated UUIDs, however they can be customized. -Another item with the same ID as a previous item shouldn't be added until the previous item has been -completed. ### Adding an item @@ -57,6 +55,28 @@ Adding an item is exactly what it sounds like! It adds an item to the work queue either be in the queue or being processed (before coming back to the queue if the processing fails) until the job is completed. +#### Adding known unique items (faster) + +*Python: `WorkQueue.add_unique_item`, +Rust: [`WorkQueue::add_unique_item`](https://docs.rs/redis-work-queue/latest/redis_work_queue/struct.WorkQueue.html#method.add_unique_item), +Node.js: [`WorkQueue::add_item`](WorkQueue-addItem), +Go: [`WorkQueue.AddUniqueItem`](https://pkg.go.dev/github.com/mevitae/redis-work-queue/go#WorkQueue.AddUniqueItem)* + +If you know that an item ID is not already in the queue, you can instead use an optimised +`add_unique_item` method, which skips that exact check. + +*If you use this incorrectly, nothing will go too badly wrong, but the reported queue length, which +may be used for autoscaling, will be inaccurate, and leasing items will take multiple iterations.* + +#### Using the right add method + +The default item constructors set the item ID to a randomly generated UUID (universally *unique* +ID). If this is used, then the `add_unique_item` method should be preferred. + +However, if duplicate jobs are likely to be added, then the item IDs should be set such that equal +jobs have equal IDs (for example by using a hash of the job), and then the `add_item` method should +be used, to prevent jobs from being duplicated. + ### Leasing an item *Python: `WorkQueue.lease`, @@ -82,7 +102,7 @@ successfully working, a job will always be run to completion (even if it is run that process). If you're unhappy about jobs being run more than once, see [But I never want my job to run more than -once](#). +once](#but-i-never-want-my-job-to-run-more-than-once). #### Storing the result of a work item @@ -152,18 +172,23 @@ complete will return `true` for only one worker. #### Storing the result -See [Storing the result of a work item](#) +See [Storing the result of a work item](#storing-the-result-of-a-work-item) ### Cleaning +When workers fail to complete items, or if they fail in the middle of redis operations, they can +leave the queue in a state that requires cleaning to ensure items are completed. Because of this, +cleans should occur periodically. + +The frequency and schedule of these is entirely up to you. Light cleans are quick, and can be +carried out regularly. Deep cleans get get very slow depending on the size of your queues, and so +should be performed less often, but should be performed to clean up cases where workers or cleaners +have unexpectedly terminated in the middle of redis operations. + #### Light cleaning *Python: `WorkQueue.light_clean`, Rust implementation planned, no Go or C# implementation planned* -When a worker dies while processing a job, or abandons a job, the job is left in the processing -state until it expires. The role of *light cleaning* is to move these jobs back to the main work -queue so another worker can pick them up. - The interval *light cleaning* should be run on should be approximately equal to the shortest lease time you use. @@ -171,19 +196,14 @@ time you use. *Python and Rust implementations planned, no Go or C# implementation planned* -In addition to this, a worker dying in the middle of a call to `complete` can leave database items -that are no longer associated with an active job. The job of a *deep clean* is to iterate over these -keys and make sure the database is clean. - It's very rare that deep cleaning is needed, but it can happen if you get really unlucky, so it -should be run automatically but infrequently. - -The cleaning process we provide runs this every 6 hours by default. +should be run automatically but less frequently, depending on your requirements for guaranteed +completion times. #### Cleaning process When there are many workers of different types, it's simpler just to have a dedicated process -running the cleaning. We provide a simple cleaner, both in Python and Rust. +running the cleaning. ### Other operations @@ -202,9 +222,127 @@ Node.js: [`WorkQueue.processing`](WorkQueue.processing), Go: [`WorkQueue.QueueLen`](https://pkg.go.dev/github.com/mevitae/redis-work-queue/go#WorkQueue.Processing)* This includes items being worked on and abandoned items (see [Handling errors](#handling-errors)) yet to be -returned to the main queue. +returned to the *main queue*. ## Testing The client implementations each have their own (very simple) unit tests. Most of the testing is done through the integrations tests, located in the [tests](./tests/) directory. + +## Technical details + +A queue is identified by its key prefix. + +Each queue item has a unique ID, and has its own *data key*, which is `:item:`. The +item data is stored with this key. If this key exists, then the item is considered incomplete. If +the item data key does not exist, the item is considered completed. + +The work queue then has a pair of lists, the *main queue* (`:queue`) and *processing queue* +(`:processing`), to track these items. However, if an item ends up in none of these queues +(which can happen if operations aren't properly completed), it is still considered an item. The +*deep clean* process fixes cases like this periodically. + +An item in a queue list, but without a data key, isn't considered an item, so should be ignored and +removed from the queues when it's encountered. + +More specifically, the *main queue* holds the list of item IDs which are yet to be processed. New +items are pushed to the left of the list, and leased items are popped from the right. + +### Adding an item + +To add an item, you must: + +- Store the item data to the item data key (`:item:`), then +- Push the item to the left of the *main queue*. + - The push should be done second, to prevent waiting workers from popping the item immediately, + before the data key is set. + +If the item's data key already exists, you shouldn't push it to the *main queue* again, since this +will cause it to be counted twice when getting the queue length, which can have negative impact on +queue-length based autoscaling. Furthermore, when the item is completed, it's copy will still be in +the *main queue*, so leasing will take more iterations. + +If the ID is already known to be unique (for example UUIDs), you can safely pipeline these +operations and skip the check. + +### Leasing an item + +The fetch an item to work on, a worker should pop from the right of the *main queue*, and push to the +left of the *processing queue* (`rpoplpush`). + +The *processing queue* is a method used to track the items currently being processed, to make the +cleaning process more efficient. + +Furthermore, while an item is being processed, it has a *lease key*, which is +`:lease:`. The value of this is the session ID of the worker which got the lease. +Lease keys are set with an expiry, once the key has expired (or is otherwise deleted), the session +is deemed to have failed working on the item, and the item will be added, by the cleaning process, +to the end of the *main queue*. + +The lease function should therefore: + +- `rpoplpush` from the *main queue* to the processing queue, +- Load the data for this item + - and, if it doesn't exist, ignore the item, + - *(in our provided client implementations, if lease is requested to block, and there's no + timeout, the lease method just tries to get the next item. In any other case, the method returns + with no item)*, then +- set the *lease key*, with an expiry time longer than the expected job duration. + +### Completing an item + +When completing an item, you must: +- Remove the data key. + +You should also: +- Remove the *lease key*, and +- Remove the item ID from the *processing queue*. + +The item is considered to be removed exactly when the data key is removed, but the other steps keep +things tidy (without removing the lease, it would eventually expire, and the cleaning process would +later remove the item ID from the processing queue anyway). + +The completion methods also return a boolean, for which only one remove call must return true. This +boolean can be decided by the output of the command to delete the data key. If it deletes the item, +this is the call that completed the item. Otherwise, another worker has already completed it. + +### Cleaning + +Items are considered items only when their data key exists. + +If a lease does not exist for the item, then processing must have failed before the item was +completed, and the item should be available again for a lease. + +The cleaning process finds items: + +- That exist, +- Aren't in the *main queue*, and +- Don't have a lease. + +And then: + +- Removes them from the processing queue, and +- Pushes them to the *main queue*. + +The item should be removed from the processing queue first. + +#### Deep clean + +The deep cleaning process is complete. It uses `keys` to enumerate all the data items for checking. + +#### Light clean + +Using `keys` can cause significant performance issues, so should ideally be avoided. + +This is why we have the *processing queue*. So long as all operations complete fully, any item with +an expired lease will be in the processing queue, we can therefore follow the usual cleaning +algorithm, but instead only use the item IDs from the *processing queue*. + +#### Cleaning schedule + +This is entirely up to you. Light cleans are quick, and can be carried out regularly. Deep cleans +get get very slow depending on the size of your queues, and so should be performed less often, but +should be performed to clean up cases where workers or cleaners have unexpectedly terminated in the +middle of redis operations. + +*Light cleaning* should be run on should be approximately equal to the shortest lease time you use. diff --git a/dotnet/RedisWorkQueue.pdf b/dotnet/RedisWorkQueue.pdf index d5fda4d..8e17edd 100644 Binary files a/dotnet/RedisWorkQueue.pdf and b/dotnet/RedisWorkQueue.pdf differ diff --git a/dotnet/RedisWorkQueue/README.md b/dotnet/RedisWorkQueue/README.md index f56c35d..0dd0432 100644 --- a/dotnet/RedisWorkQueue/README.md +++ b/dotnet/RedisWorkQueue/README.md @@ -9,46 +9,12 @@ readme](https://github.com/MeVitae/redis-work-queue/blob/main/README.md). ## Documentation -Below is a brief overview and an example. More details on the core concepts can be found in the -[readme](https://github.com/MeVitae/redis-work-queue/blob/main/README.md), and full API -documentation can be found in +Below is a brief example. More details on the core concepts can be found in the +[readme](https://github.com/MeVitae/redis-work-queue/blob/main/README.md), and +full API documentation can be found in [../RedisWorkQueue.pdf](https://github.com/MeVitae/redis-work-queue/blob/main/dotnet/RedisWorkQueue.pdf). -## WorkQueue - -The `WorkQueue` class represents a work queue backed by a Redis database. It provides methods to add -items to the queue, lease items from the queue, and mark completed items as done. - -### Properties - -- `Session`: Gets or sets the unique identifier for the current session. - -### Methods - -- `AddItem(IRedisClient db, Item item)`: Adds an item to the work queue. The `db` parameter is the - Redis instance and the `item` parameter is the item to be added. - -- `QueueLength(IRedisClient db)`: Gets the length of the main queue. The `db` parameter is the Redis - instance. - -- `Processing(IRedisClient db)`: Gets the length of the processing queue. The `db` parameter is the - Redis instance. - -- `LeaseExists(IRedisClient db, string itemId)`: Checks if a lease exists for the specified item ID. - The `db` parameter is the Redis instance and the `itemId` parameter is the ID of the item to - check. - -- `Lease(IRedisClient db, int leaseSeconds, bool block, int timeout = 0)`: Requests a work lease - from the work queue. This should be called by a worker to get work to complete. The `db` parameter - is the Redis instance, the `leaseSeconds` parameter is the number of seconds to lease the item for, - the `block` parameter indicates whether to block and wait for an item to be available if the main - queue is empty, and the `timeout` parameter is the maximum time to block in seconds. - -- `Complete(IRedisClient db, Item item)`: Marks a job as completed and removes it from the work - queue. The `db` parameter is the Redis instance and the `item` parameter is the item to be - completed. - -### Example Usage +## Example Usage ```csharp using FreeRedis; diff --git a/dotnet/RedisWorkQueue/RedisWorkQueue.csproj b/dotnet/RedisWorkQueue/RedisWorkQueue.csproj index fd0346f..e2872a3 100644 --- a/dotnet/RedisWorkQueue/RedisWorkQueue.csproj +++ b/dotnet/RedisWorkQueue/RedisWorkQueue.csproj @@ -1,10 +1,10 @@ - net6.0;net7.0 + net6.0;net7.0;net8.0 enable MeVitae.RedisWorkQueue - 0.2.1 + 0.3.0 Jacob O'Toole, Nathan Lamplough, Ilie Mihai Alexandru MeVitae https://github.com/MeVitae/redis-work-queue @@ -22,7 +22,7 @@ - + diff --git a/dotnet/RedisWorkQueue/WorkQueue.cs b/dotnet/RedisWorkQueue/WorkQueue.cs index b1e1687..782e8fa 100644 --- a/dotnet/RedisWorkQueue/WorkQueue.cs +++ b/dotnet/RedisWorkQueue/WorkQueue.cs @@ -1,5 +1,6 @@ using System; using System.Text; +using System.Linq; using FreeRedis; @@ -44,24 +45,40 @@ public WorkQueue(KeyPrefix name) this.Session = name.Of(Guid.NewGuid().ToString()); this.MainQueueKey = name.Of(":queue"); this.ProcessingKey = name.Of(":processing"); - this.LeaseKey = name.Concat(":leased_by_session:"); + this.LeaseKey = name.Concat(":lease:"); this.ItemDataKey = name.Concat(":item:"); } /// - /// Adds item to the work queue. + /// Add an item to the work queue. + /// + /// If an item with the same ID already exists, this item is not added, and `false` is returned. Otherwise, if the item is added `true` is returned. + /// + /// If you know the item ID is unique, and not already in the queue, use the optimised WorkQueue.AddUniqueItem instead. /// /// Redis instance. /// Item to be added. - public void AddItem(IRedisClient db, Item item) + public bool AddItem(IRedisClient db, Item item) { - using (var pipe = db.StartPipe()) + if (db.SetNx(ItemDataKey.Of(item.ID), item.Data)) { - pipe.Set(ItemDataKey.Of(item.ID), item.Data); - pipe.LPush(MainQueueKey, item.ID); - - pipe.EndPipe(); + db.LPush(MainQueueKey, item.ID); + return true; } + return false; + } + + /// + /// Add an item, which is known to have an ID not already in the queue. + /// + /// Redis instance. + /// Item to be added. + public void AddUniqueItem(IRedisClient db, Item item) + { + using var pipe = db.StartPipe(); + pipe.Set(ItemDataKey.Of(item.ID), item.Data); + pipe.LPush(MainQueueKey, item.ID); + pipe.EndPipe(); } /// @@ -90,7 +107,7 @@ public long Processing(IRedisClient db) /// The Redis client instance. /// The ID of the item to check. /// True if lease exists, false otherwise. - public bool LeaseExists(IRedisClient db, string itemId) + private bool LeaseExists(IRedisClient db, string itemId) { return db.Exists(LeaseKey.Of(itemId)); } @@ -98,74 +115,129 @@ public bool LeaseExists(IRedisClient db, string itemId) /// /// Request a work lease from the work queue. This should be called by a worker to get work to complete. + /// /// When completed, the `complete` method should be called. + /// /// If `block` is true, the function will return either when a job is leased or after `timeout` seconds if `timeout` isn't 0. + /// /// If the job is not completed before the end of `leaseDuration`, another worker may pick up the same job. + /// /// It is not a problem if a job is marked as `done` more than once. - ///If you haven't already, it's worth reading the documentation on leasing items: + /// + /// If you haven't already, it's worth reading the documentation on leasing items: /// https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item /// /// The Redis client instance. /// The number of seconds to lease the item for. /// Indicates whether to block and wait for an item to be available if the main queue is empty. - /// The maximum time to block in seconds. + /// The maximum time to block in seconds. If 0, there is not timeout. /// The leased item, or null if no item is available. public Item? Lease(IRedisClient db, int leaseSeconds, bool block, int timeout = 0) { - object maybeItemId; - if (block) - { - maybeItemId = db.BRPopLPush(MainQueueKey, ProcessingKey, timeout); - } - else + for (; ; ) { - maybeItemId = db.RPopLPush(MainQueueKey, ProcessingKey); + object maybeItemId; + if (block) + maybeItemId = db.BRPopLPush(MainQueueKey, ProcessingKey, timeout); + else + maybeItemId = db.RPopLPush(MainQueueKey, ProcessingKey); + + if (maybeItemId == null) + return null; + + string itemId; + if (maybeItemId is byte[]) + itemId = Encoding.UTF8.GetString((byte[])maybeItemId); + else if (maybeItemId is string) + itemId = (string)maybeItemId; + else + throw new Exception("item id from work queue not bytes or string"); + + var data = db.Get(ItemDataKey.Of(itemId)); + if (data == null) + { + if (block && timeout == 0) + continue; + return null; + } + + db.SetEx(LeaseKey.Of(itemId), leaseSeconds, Encoding.UTF8.GetBytes(Session)); + + return new Item(data, itemId); } - - if (maybeItemId == null) - return null; - - string itemId; - if (maybeItemId is byte[]) - itemId = Encoding.UTF8.GetString((byte[])maybeItemId); - else if (maybeItemId is string) - itemId = (string)maybeItemId; - else - throw new Exception("item id from work queue not bytes or string"); - - var data = db.Get(ItemDataKey.Of(itemId)); - if (data == null) - data = new byte[0]; - - db.SetEx(LeaseKey.Of(itemId), leaseSeconds, Encoding.UTF8.GetBytes(Session)); - - return new Item(data, itemId); } /// - /// Marks a job as completed and remove it from the work queue. + /// Marks a job as completed and remove it from the work queue. After `complete` has been + /// called (and returns `true`), no workers will receive this job again. /// /// The Redis client instance. /// The item to be completed. /// True if the item was successfully completed and removed, otherwise false. public bool Complete(IRedisClient db, Item item) { - var removed = db.LRem(ProcessingKey, 0, item.ID); - - if (removed == 0) - return false; + using var pipe = db.StartPipe(); + pipe.Del(ItemDataKey.Of(item.ID)); + pipe.LRem(ProcessingKey, 0, item.ID); + pipe.Del(LeaseKey.Of(item.ID)); + var results = pipe.EndPipe(); + return ((long)results[0]) != 0; + } - string itemId = item.ID; + public void LightClean(IRedisClient db) + { + // A light clean only checks items in the processing queue + var processing = db.LRange(ProcessingKey, 0, -1); + foreach (string itemId in processing) + { + // If there's no lease for the item, then it should be reset. + if (!LeaseExists(db, itemId)) + { + // We also check the item actually exists before pushing it back to the main queue + if (db.Exists(ItemDataKey.Of(itemId))) + { + Console.WriteLine($"{itemId} has no lease, it will be reset"); + using var pipe = db.StartPipe(); + pipe.LRem(ProcessingKey, 0, itemId); + pipe.LPush(MainQueueKey, itemId); + pipe.EndPipe(); + } + else + { + Console.WriteLine($"{itemId} was in the processing queue but does not exist"); + db.LRem(ProcessingKey, 0, itemId); + } + } + } + } + public void DeepClean(IRedisClient db) + { + // A deep clean checks all data keys + string[] itemDataKeys; + string[] mainQueue; using (var pipe = db.StartPipe()) { - pipe.Del(ItemDataKey.Of(itemId)); - pipe.Del(LeaseKey.Of(itemId)); - - pipe.EndPipe(); + pipe.Keys(ItemDataKey.Of("*")); + pipe.LRange(MainQueueKey, 0, -1); + var results = pipe.EndPipe(); + itemDataKeys = (string[])results[0]; + mainQueue = (string[])results[1]; + } + foreach (string itemDataKey in itemDataKeys) + { + string itemId = itemDataKey.Substring(ItemDataKey.Prefix.Length); + // If the item isn't in the queue, and there's no lease for the item, then it should + // be reset. + if (!mainQueue.Contains(itemId) && !LeaseExists(db, itemId)) + { + Console.WriteLine($"{itemId} has no lease, it will be reset"); + using var pipe = db.StartPipe(); + pipe.LRem(ProcessingKey, 0, itemId); + pipe.LPush(MainQueueKey, itemId); + pipe.EndPipe(); + } } - - return true; } } } diff --git a/go/WorkQueue.go b/go/WorkQueue.go index 3286bb3..52947b6 100644 --- a/go/WorkQueue.go +++ b/go/WorkQueue.go @@ -66,6 +66,8 @@ package workqueue import ( "context" + "fmt" + "slices" "time" "github.com/google/uuid" @@ -93,15 +95,28 @@ func NewWorkQueue(name KeyPrefix) WorkQueue { session: uuid.NewString(), mainQueueKey: name.Of(":queue"), processingKey: name.Of(":processing"), - leaseKey: name.Concat(":leased_by_session:"), + leaseKey: name.Concat(":lease:"), itemDataKey: name.Concat(":item:"), } } -// AddItemToPipeline adds an item to the work queue. This adds the redis commands onto the pipeline passed. +// AddItem to the work queue. +// +// If an item with the same ID already exists, this item is not added, and false is returned. Otherwise, if the item is added true is returned. +// +// If you know the item ID is unique, and not already in the queue, use the optimised WorkQueue.AddUniqueItem instead. +func (workQueue *WorkQueue) AddItem(ctx context.Context, db *redis.Client, item Item) (bool, error) { + added, err := db.SetNX(ctx, workQueue.itemDataKey.Of(item.ID), item.Data, never).Result() + if added { + err = db.LPush(ctx, workQueue.mainQueueKey, item.ID).Err() + } + return added, err +} + +// AddItemToPipeline adds an item, which is known to have an ID not already in the queue, to the work queue. This adds the redis commands onto the pipeline passed. // -// Use [WorkQueue.AddItem] if you don't want to pass a pipeline directly. -func (workQueue *WorkQueue) AddItemToPipeline(ctx context.Context, pipeline redis.Pipeliner, item Item) { +// Use [WorkQueue.AddUniqueItem] if you don't want to pass a pipeline directly. +func (workQueue *WorkQueue) AddUniqueItemToPipeline(ctx context.Context, pipeline redis.Pipeliner, item Item) { // Add the item data // NOTE: it's important that the data is added first, otherwise someone could pop the item // before the data is ready @@ -110,12 +125,12 @@ func (workQueue *WorkQueue) AddItemToPipeline(ctx context.Context, pipeline redi pipeline.LPush(ctx, workQueue.mainQueueKey, item.ID) } -// AddItem to the work queue. +// AddItem, which is known to have an ID not already in the queue, to the work queue. // // This creates a pipeline and executes it on the database. -func (workQueue *WorkQueue) AddItem(ctx context.Context, db *redis.Client, item Item) error { +func (workQueue *WorkQueue) AddUniqueItem(ctx context.Context, db *redis.Client, item Item) error { pipeline := db.Pipeline() - workQueue.AddItemToPipeline(ctx, pipeline, item) + workQueue.AddUniqueItemToPipeline(ctx, pipeline, item) _, err := pipeline.Exec(ctx) return err } @@ -151,36 +166,119 @@ func (workQueue *WorkQueue) Lease( timeout time.Duration, leaseDuration time.Duration, ) (*Item, error) { - // First, to get an item, we try to move an item from the main queue to the processing list. - var command *redis.StringCmd - if block { - command = db.BRPopLPush(ctx, workQueue.mainQueueKey, workQueue.processingKey, timeout) - } else { - command = db.RPopLPush(ctx, workQueue.mainQueueKey, workQueue.processingKey) - } - itemId, err := command.Result() - if itemId == "" || err != nil { - // A nil error indicates no job available - if err == redis.Nil { - return nil, nil + for { + // First, to get an item, we try to move an item from the main queue to the processing list. + var command *redis.StringCmd + if block { + command = db.BRPopLPush(ctx, workQueue.mainQueueKey, workQueue.processingKey, timeout) + } else { + command = db.RPopLPush(ctx, workQueue.mainQueueKey, workQueue.processingKey) } - return nil, err + itemId, err := command.Result() + if itemId == "" || err != nil { + // A nil error indicates no job available + if err == redis.Nil { + return nil, nil + } + return nil, err + } + + // Get the item's data + data, err := db.Get(ctx, workQueue.itemDataKey.Of(itemId)).Bytes() + if err != nil { + if err == redis.Nil { + if block && timeout == 0 { + continue + } + return nil, nil + } + return nil, err + } + + // Now setup the lease item. + // NOTE: Racing for a lease is ok + err = db.SetEx(ctx, workQueue.leaseKey.Of(itemId), workQueue.session, leaseDuration).Err() + + return &Item{ + ID: itemId, + Data: data, + }, err } +} + +func (workQueue *WorkQueue) leaseExists(ctx context.Context, db *redis.Client, itemId string) (bool, error) { + exists, err := db.Exists(ctx, workQueue.leaseKey.Of(itemId)).Result() + return exists > 0, err +} - // Get the item's data - data, err := db.Get(ctx, workQueue.itemDataKey.Of(itemId)).Bytes() +func (workQueue *WorkQueue) LightClean(ctx context.Context, db *redis.Client) error { + // A light clean only checks items in the processing queue + itemIds, err := db.LRange(ctx, workQueue.processingKey, 0, -1).Result() if err != nil { - return nil, err + return fmt.Errorf("failed to list processing queue: %w", err) } + for _, itemId := range itemIds { + leaseExists, err := workQueue.leaseExists(ctx, db, itemId) + if err != nil { + return fmt.Errorf("failed to check if lease exists for %s: %w", itemId, err) + } + // If there's no lease for the item, then it should be reset. + if !leaseExists { + // We also check that the item actually exists before pushing it back to the main queue + itemExists, err := db.Exists(ctx, workQueue.itemDataKey.Of(itemId)).Result() + if err != nil { + return fmt.Errorf("failed to check if item %s exists: %w", itemId, err) + } + if itemExists != 0 { + fmt.Println(itemId, "has no lease, it will be reset") + pipeline := db.Pipeline() + pipeline.LRem(ctx, workQueue.processingKey, 0, itemId) + pipeline.LPush(ctx, workQueue.mainQueueKey, itemId) + _, err := pipeline.Exec(ctx) + if err != nil { + return fmt.Errorf("failed to move %s from processing queue to main queue: %w", itemId, err) + } + } else { + fmt.Println(itemId, "was in the processing queue but does not exist") + err = db.LRem(ctx, workQueue.processingKey, 0, itemId).Err() + if err != nil { + return fmt.Errorf("failed to remove %s from processing queue: %w", itemId, err) + } + } + } + } + return nil +} - // Now setup the lease item. - // NOTE: Racing for a lease is ok - err = db.SetEx(ctx, workQueue.leaseKey.Of(itemId), workQueue.session, leaseDuration).Err() - - return &Item{ - ID: itemId, - Data: data, - }, err +func (workQueue *WorkQueue) DeepClean(ctx context.Context, db *redis.Client) error { + // A deep clean checks all data keys + pipeline := db.Pipeline() + itemDataKeys := db.Keys(ctx, workQueue.itemDataKey.Of("*")) + mainQueueRes := db.LRange(ctx, workQueue.mainQueueKey, 0, -1) + _, err := pipeline.Exec(ctx) + if err != nil { + return fmt.Errorf("failed to list item data keys: %w", err) + } + mainQueue := mainQueueRes.Val() + for _, itemDataKey := range itemDataKeys.Val() { + itemId := itemDataKey[len(workQueue.itemDataKey):] + leaseExists, err := workQueue.leaseExists(ctx, db, itemId) + if err != nil { + return fmt.Errorf("failed to check if lease exists for %s: %w", itemId, err) + } + // If the item isn't in the queue, and there's no lease for the item, then it should be reset. + if !slices.Contains(mainQueue, itemId) && !leaseExists { + fmt.Println(itemId, "has no lease, it will be reset") + pipeline := db.Pipeline() + pipeline.LRem(ctx, workQueue.processingKey, 0, itemId) + pipeline.LPush(ctx, workQueue.mainQueueKey, itemId) + _, err := pipeline.Exec(ctx) + if err != nil { + return fmt.Errorf("failed to move %s from processing queue to main queue: %w", itemId, err) + } + } + } + return nil } // Complete marks a job as completed and remove it from the work queue. After Complete has been @@ -190,17 +288,10 @@ func (workQueue *WorkQueue) Lease( // first worker to call Complete*. So, while lease might give the same job to multiple workers, // complete will return true for only one worker. func (workQueue *WorkQueue) Complete(ctx context.Context, db *redis.Client, item *Item) (bool, error) { - removed, err := db.LRem(ctx, workQueue.processingKey, 0, item.ID).Result() - if removed == 0 || err != nil { - return false, err - } - // If we did actually remove it, delete the item data and lease. - // If we didn't really remove it, it's probably been returned to the work queue so the data is - // still needed and the lease might not be ours (if it is still ours, it'll expire anyway). - _, err = db.Pipelined(ctx, func(pipeline redis.Pipeliner) error { - pipeline.Del(ctx, workQueue.itemDataKey.Of(item.ID)) - pipeline.Del(ctx, workQueue.leaseKey.Of(item.ID)) - return nil - }) - return true, err + pipeline := db.Pipeline() + delResult := pipeline.Del(ctx, workQueue.itemDataKey.Of(item.ID)) + pipeline.LRem(ctx, workQueue.processingKey, 0, item.ID) + pipeline.Del(ctx, workQueue.leaseKey.Of(item.ID)) + _, err := pipeline.Exec(ctx) + return delResult.Val() != 0, err } diff --git a/go/go.mod b/go/go.mod index ebce525..7663450 100644 --- a/go/go.mod +++ b/go/go.mod @@ -3,11 +3,11 @@ module github.com/mevitae/redis-work-queue/go go 1.20 require ( - github.com/google/uuid v1.4.0 - github.com/redis/go-redis/v9 v9.3.0 + github.com/google/uuid v1.6.0 + github.com/redis/go-redis/v9 v9.6.1 ) require ( - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect ) diff --git a/go/go.sum b/go/go.sum index c34322c..f313488 100644 --- a/go/go.sum +++ b/go/go.sum @@ -2,9 +2,15 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0= github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= diff --git a/node/package.json b/node/package.json index c1127bd..7f899c7 100644 --- a/node/package.json +++ b/node/package.json @@ -1,6 +1,6 @@ { "name": "@mevitae/redis-work-queue", - "version": "0.1.5", + "version": "0.3.0", "main": "./src/WorkQueue.ts", "description": "A work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js (TypeScript) and Dotnet (C#).", "keywords": [ diff --git a/node/src/KeyPrefix.ts b/node/src/KeyPrefix.ts index 62a05e4..e537ed3 100644 --- a/node/src/KeyPrefix.ts +++ b/node/src/KeyPrefix.ts @@ -18,7 +18,7 @@ export class KeyPrefix { /** * KeyPrefix instance prefix. */ - private prefix: string; + prefix: string; /** * This creates a new instance with the prefix passed. diff --git a/node/src/WorkQueue.ts b/node/src/WorkQueue.ts index 5fb59ab..c72880f 100644 --- a/node/src/WorkQueue.ts +++ b/node/src/WorkQueue.ts @@ -17,7 +17,6 @@ export class WorkQueue { private session: string; private mainQueueKey: string; private processingKey: string; - private cleaningKey: string; private leaseKey: KeyPrefix; private itemDataKey: KeyPrefix; @@ -27,20 +26,40 @@ export class WorkQueue { constructor(name: KeyPrefix) { this.mainQueueKey = name.of(':queue'); this.processingKey = name.of(':processing'); - this.cleaningKey = name.of(':cleaning'); this.session = uuidv4(); - this.leaseKey = name.concat(':leased_by_session:'); + this.leaseKey = name.concat(':lease:'); this.itemDataKey = name.concat(':item:'); } /** - * Add an item to the work queue. This adds the redis commands onto the pipeline passed. - * Use `WorkQueue.addItem` if you don't want to pass a pipeline directly. - * Add the item data. - * @param {Pipeline} pipeline The pipeline that the data will be executed. - * @param {Item} item The Item which will be set in the Redis with the key of this.itemDataKey.of(item.id). + * Add an item to the work queue. + * + * If an item with the same ID already exists, this item is not added, and `false` is returned. + * Otherwise, if the item is added `true` is returned. + * + * If you know the item ID is unique, and not already in the queue, use the optimised + * `WorkQueue.add_unique_item` instead. + * + * @param {Redis} db - The Redis Connection. + * @param {Item} item - The item to be added.. + * @returns {Promise} - A boolean indicating if the item was added. */ - addItemToPipeline(pipeline: ChainableCommander, item: Item) { + async addItem(db: Redis, item: Item): Promise { + const added = await db.setnx(this.itemDataKey.of(item.id), item.data) > 0; + if (added) await db.lpush(this.mainQueueKey, item.id); + return added; + } + + /** + * Add an item, which is known to have an ID not already in the queue, to the work queue. This + * adds the redis commands onto the pipeline passed. + * + * Use `WorkQueue.add_unique_item` if you don't want to pass a pipeline directly. + * + * @param {Pipeline} pipeline - The pipeline that the data will be executed. + * @param {Item} item - The Item to be added. + */ + addUniqueItemToPipeline(pipeline: ChainableCommander, item: Item) { // NOTE: it's important that the data is added first, otherwise someone before the data is ready. pipeline.set(this.itemDataKey.of(item.id), item.data); // Then add the id to the work queue @@ -48,32 +67,34 @@ export class WorkQueue { } /** - * Add an item to the work queue. + * Add an item, which is known to have an ID not already in the queue, to the work queue. + * * This creates a pipeline and executes it on the database. * - * @param {Redis} db The Redis Connection. - * @param item The item that will be executed using the method addItemToPipeline. + * @param {Redis} db - The Redis Connection. + * @param {Item} item - The item to be added. */ - async addItem(db: Redis, item: Item): Promise { + async addUniqueItem(db: Redis, item: Item): Promise { const pipeline = db.pipeline(); - this.addItemToPipeline(pipeline, item); + this.addUniqueItemToPipeline(pipeline, item); await pipeline.exec(); } /** - * This is used to get the length of the Main Queue. + * Return the length of the work queue (not including items being processed, see + * `WorkQueue.processing`). * - * @param {Redis} db The Redis Connection. - * @returns {Promise} Return the length of the work queue (not including items being processed, see `WorkQueue.processing()`). + * @param {Redis} db - The Redis Connection. + * @returns {Promise} The length of the work queue (not including items being processed). */ queueLen(db: Redis): Promise { return db.llen(this.mainQueueKey); } /** - * This is used to get the lenght of the Processing Queue. + * Return the number of items being processed. * - * @param {Redis} db The Redis Connection. + * @param {Redis} db - The Redis Connection. * @returns {Promise} The number of items being processed. */ processing(db: Redis): Promise { @@ -81,138 +102,147 @@ export class WorkQueue { } /** - * This method can be used to check if a Lease Exists or not for a itemId. + * Request a work lease from the work queue. This should be called by a worker to get work to + * complete. When completed, the `complete` method should be called. * - * @param {Redis} db The Redis Connection. - * @param {string} itemId The itemId of the item you want to check if it has a lease. - * @returns {Promise} - */ - async leaseExists(db: Redis, itemId: string): Promise { - const exists = await db.exists(this.leaseKey.of(itemId)); - return exists !== 0; - } - - /** - * Request a work lease from the work queue. This should be called by a worker to get work to complete. - * When completed, the `complete` method should be called. + * If `block` is true, the function will return either when a job is leased or after `timeout` + * seconds if `timeout` isn't 0. * - * If `block` is true, the function will return either when a job is leased or after `timeout` seconds if `timeout` isn't 0. * If the job is not completed before the end of `leaseDuration`, another worker may pick up the same job. + * * It is not a problem if a job is marked as `done` more than once. * * If you haven't already, it's worth reading the documentation on leasing items: * https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item * - * @param {Redis} db The Redis Connection. - * @param {number} leaseSecs The number of seconds that the lease should hold. - * @param {boolean} block Is a block or not, default is true. - * @param {number} timeout The number of seconds the lease will time out at. - * @returns {Promise} Returns a new lease Item. - * - * Process: - * First, to get an item, we try to move an item from the main queue to the processing list. - * Then we setup the lease item. + * @param {Redis} db - The Redis Connection. + * @param {number} leaseSecs - The number of seconds that the lease should hold. + * @param {boolean} block - If false, the method will return immediately if there is no item. + * @param {number} timeout - The number of seconds the lease will time out at. + * @returns {Promise} The item to process! */ async lease( db: Redis, leaseSecs: number, block = true, - timeout = 1 + timeout = 0, ): Promise { - let maybeItemId: string | null = null; - - // Try to move an item from the main queue to the processing list. - if (block) { - maybeItemId = await db.brpoplpush( - this.mainQueueKey, - this.processingKey, - timeout - ); - } else { - maybeItemId = await db.rpoplpush(this.mainQueueKey, this.processingKey); - } + for (;;) { + let maybeItemId: string | null = null; + + // Try to move an item from the main queue to the processing list. + if (block) { + maybeItemId = await db.brpoplpush( + this.mainQueueKey, + this.processingKey, + timeout + ); + } else { + maybeItemId = await db.rpoplpush(this.mainQueueKey, this.processingKey); + } - if (maybeItemId == null) { - return null; - } + if (maybeItemId == null) { + return null; + } - const itemId = maybeItemId; + const itemId = maybeItemId; - let data: Buffer | null = await db.getBuffer(this.itemDataKey.of(itemId)); + let data: Buffer | null = await db.getBuffer(this.itemDataKey.of(itemId)); - if (data == null) { - data = Buffer.alloc(0); + if (data == null) { + if (block && timeout === 0) continue; + return null; + } + + // Setup the lease item. + await db.setex(this.leaseKey.of(itemId), leaseSecs, this.session); + return new Item(data, itemId); } + } - // Setup the lease item. - await db.setex(this.leaseKey.of(itemId), leaseSecs, this.session); - return new Item(data, itemId); + /** + * Marks a job as completed and remove it from the work queue. After `complete` has been called + * (and returns `true`), no workers will receive this job again. + * + * @param {Redis} db - The Redis connection. + * @param {Item} item - The Item to complete. + * @returns {boolean} a boolean indicating if *the job has been removed* **and** *this worker was the first worker to call `complete`*. So, while lease might give the same job to multiple workers, complete will return `true` for only one worker. + */ + async complete(db: Redis, item: Item): Promise { + const results = await db.pipeline() + .del(this.itemDataKey.of(item.id)) + .lrem(this.processingKey, 0, item.id) + .del(this.leaseKey.of(item.id)) + .exec(); + return Boolean(results && results[0][1]); } /** - * Moves items from the processing Queue to the Main Queue if the lease key is missing. - * This can be used in case worker dies or crashes and item is hold onto the processing, this allows the item to be moved onto another worker. + * Check if a lease exists for an `itemId`. * - * @param {Redis} db The Redis connection. + * @param {Redis} db - The Redis Connection. + * @param {string} itemId + * @returns {Promise} A boolean indicating if a lease exists for the item. + */ + private async leaseExists(db: Redis, itemId: string): Promise { + const exists = await db.exists(this.leaseKey.of(itemId)); + return exists > 0; + } + + /** + * Check if an item exists with `itemId`. * - * Process Explenation: - * If the lease key is not present for an item (it expired or was never created because the client crashed before creating it), then move the item back to the main queue so others can work on it. - * While working on an item, we store it in the cleaning list. If we ever crash, we come back and check these items. + * @param {Redis} db - The Redis Connection. + * @param {string} itemId + * @returns {Promise} A boolean indicating if the item exists. */ + private async itemExists(db: Redis, itemId: string): Promise { + const exists = await db.exists(this.leaseKey.of(itemId)); + return exists > 0; + } + async lightClean(db: Redis) { const processing: Array = await db.lrange( this.processingKey, - 0, - -1 + 0, -1, ); for (const itemId of processing) { + // If there's no lease for the item, then it should be reset. if (!(await this.leaseExists(db, itemId))) { - await db.lpush(this.cleaningKey, itemId); - const removed = await db.lrem(this.processingKey, 0, itemId); - if (removed > 0) { - await db.lpush(this.mainQueueKey, 0, itemId); + // We also check that the item actually exists before pushing it back to the main queue + if (await this.itemExists(db, itemId)) { + console.log(itemId, "has no lease, it will be reset"); + await db.pipeline() + .lrem(this.processingKey, 0, itemId) + .lpush(this.mainQueueKey, itemId) + .exec(); + } else { + console.log(itemId, "was in the processing queue but does not exist"); + await db.lrem(this.processingKey, 0, itemId); } - await db.lrem(this.cleaningKey, 0, itemId); } } - - const forgot: Array = await db.lrange(this.cleaningKey, 0, -1); - for (const itemId of forgot) { - const leaseExists: boolean = await this.leaseExists(db, itemId); - if ( - !leaseExists && - (await db.lpos(this.mainQueueKey, itemId)) == null && - (await db.lpos(this.processingKey, itemId)) == null - ) { - /** - * FIXME: this introduces a race - * maybe not anymore - * no, it still does, what if the job has been completed? - */ - await db.lpush(this.mainQueueKey, itemId); - } - await db.lrem(this.cleaningKey, 0, itemId); - } } - /** - * Marks a job as completed and remove it from the work queue. - * - * @param {Redis} db The Redis connection. - * @param {Item} item The Item which the processing got completed - * @returns {boolean} returns a boolean indicating if *the job has been removed* **and** *this worker was the first worker to call `complete`*. So, while lease might give the same job to multiple workers, complete will return `true` for only one worker. - */ - async complete(db: Redis, item: Item): Promise { - const removed = await db.lrem(this.processingKey, 0, item.id); - if (removed === 0) { - return false; + async deepClean(db: Redis) { + // A deep clean checks all data keys + const itemResults = await db.pipeline() + .keys(this.itemDataKey.of("*")) + .lrange(this.mainQueueKey, 0, -1) + .exec(); + if (!itemResults) throw new Error("pipeline did not return results"); + const [[_err_a, itemDataKeys], [_err_b, mainQueueUntyped]] = itemResults; + const mainQueue = mainQueueUntyped as string[]; + for (const itemDataKey of itemDataKeys as string[]) { + const itemId = itemDataKey.slice(this.itemDataKey.prefix.length); + // If the item isn't in the queue, and there's no lease for the item, then it should be reset. + if (!mainQueue.includes(itemId) && !(await this.leaseExists(db, itemId))) { + console.log(itemId, "has no lease, it will be reset"); + await db.pipeline() + .lrem(this.processingKey, 0, itemId) + .lpush(this.mainQueueKey, itemId) + .exec(); + } } - - const pipeline = db.pipeline(); - pipeline.del(this.itemDataKey.of(item.id)); - pipeline.del(this.leaseKey.of(item.id)); - await pipeline.exec(); - - return true; } } diff --git a/python/README.md b/python/README.md index f0e53b1..969faef 100644 --- a/python/README.md +++ b/python/README.md @@ -42,10 +42,18 @@ assert bytes_item.data_json() == [1, 2, 3] ``` ### Add an item to a work queue + ```python work_queue.add_item(db, item) ``` +If you know that items have unique IDs, which aren't the same as any already in the queue, you can +instead use: + +```python +work_queue.add_unique_item(db, item) +``` + ## Completing work Please read [the documentation on leasing and completing diff --git a/python/pyproject.toml b/python/pyproject.toml index 752757a..393699a 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "redis-work-queue" -version = "0.1.5" +version = "0.3.0" description = "A work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js (TypeScript) and Dotnet (C#)." dependencies = ["redis", "uuid"] authors = [ diff --git a/python/redis_work_queue/workqueue.py b/python/redis_work_queue/workqueue.py index 79b605e..ac1d530 100644 --- a/python/redis_work_queue/workqueue.py +++ b/python/redis_work_queue/workqueue.py @@ -13,14 +13,25 @@ def __init__(self, name: KeyPrefix): self._session = uuid.uuid4().hex self._main_queue_key = name.of(':queue') self._processing_key = name.of(':processing') - self._cleaning_key = name.of(':cleaning') - self._lease_key = KeyPrefix.concat(name, ':leased_by_session:') + self._lease_key = KeyPrefix.concat(name, ':lease:') self._item_data_key = KeyPrefix.concat(name, ':item:') - def add_item_to_pipeline(self, pipeline: Pipeline, item: Item) -> None: - """Add an item to the work queue. This adds the redis commands onto the pipeline passed. + def add_item(self, db: Redis, item: Item) -> bool: + """Add an item to the work queue. + + If an item with the same ID already exists, this item is not added, and False is returned. Otherwise, if the item is added True is returned. - Use `WorkQueue.add_item` if you don't want to pass a pipeline directly. + If you know the item ID is unique, and not already in the queue, use the optimised `add_unique_item` method instead. + """ + if db.setnx(self._item_data_key.of(item.id()), item.data()) != 0: + db.lpush(self._main_queue_key, item.id()) + return True + return False + + def add_unique_item_to_pipeline(self, pipeline: Pipeline, item: Item) -> None: + """Add an item, which is known to have an ID not already in the queue, to the work queue. This adds the redis commands onto the pipeline passed. + + Use `WorkQueue.add_unique_item` if you don't want to pass a pipeline directly. """ # Add the item data # NOTE: it's important that the data is added first, otherwise someone before the data is @@ -29,13 +40,13 @@ def add_item_to_pipeline(self, pipeline: Pipeline, item: Item) -> None: # Then add the id to the work queue pipeline.lpush(self._main_queue_key, item.id()) - def add_item(self, db: Redis, item: Item) -> None: - """Add an item to the work queue. + def add_unique_item(self, db: Redis, item: Item) -> None: + """Add an item, which is known to have an ID not already in the queue, to the work queue. This creates a pipeline and executes it on the database. """ pipeline = db.pipeline() - self.add_item_to_pipeline(pipeline, item) + self.add_unique_item_to_pipeline(pipeline, item) pipeline.execute() def queue_len(self, db: Redis) -> int: @@ -47,54 +58,13 @@ def processing(self, db: Redis) -> int: """Return the number of items being processed.""" return db.llen(self._processing_key) - def light_clean(self, db: Redis) -> None: - processing: list[bytes | str] = db.lrange( - self._processing_key, 0, -1) - for item_id in processing: - if isinstance(item_id, bytes): - item_id = item_id.decode('utf-8') - # If the lease key is not present for an item (it expired or was never created because - # the client crashed before creating it) then move the item back to the main queue so - # others can work on it. - if not self._lease_exists(db, item_id): - print(item_id, 'has no lease') - # While working on an item, we store it in the cleaning list. If we ever crash, we - # come back and check these items. - db.lpush(self._cleaning_key, item_id) - removed = int(db.lrem(self._processing_key, 0, item_id)) - if removed > 0: - db.lpush(self._main_queue_key, item_id) - print(item_id, 'was still in the processing queue, it was reset') - else: - print(item_id, 'was no longer in the processing queue') - db.lrem(self._cleaning_key, 0, item_id) - - # Now we check the - forgot: list[bytes | str] = db.lrange(self._cleaning_key, 0, -1) - for item_id in forgot: - if isinstance(item_id, bytes): - item_id = item_id.decode('utf-8') - print(item_id, 'was forgotten in clean') - if not self._lease_exists(db, item_id) and \ - db.lpos(self._main_queue_key, item_id) is None and \ - db.lpos(self._processing_key, item_id) is None: - # FIXME: this introcudes a race - # maybe not anymore - # no, it still does, what if the job has been completed? - db.lpush(self._main_queue_key, item_id) - print(item_id, 'was not in any queue, it was reset') - db.lrem(self._cleaning_key, 0, item_id) - - def _lease_exists(self, db: Redis, item_id: str) -> bool: - """True iff a lease on 'item_id' exists.""" - return db.exists(self._lease_key.of(item_id)) != 0 - def lease(self, db: Redis, lease_secs: int, block=True, timeout=0) -> Item | None: """Request a work lease the work queue. This should be called by a worker to get work to complete. When completed, the `complete` method should be called. If `block` is true, the function will return either when a job is leased or after `timeout` - if `timeout` isn't 0. + if `timeout` isn't 0. (If `timeout` isn't 0, this may return earlier, with `None` in some + race cases). If the job is not completed before the end of `lease_duration`, another worker may pick up the same job. It is not a problem if a job is marked as `done` more than once. @@ -102,39 +72,42 @@ def lease(self, db: Redis, lease_secs: int, block=True, timeout=0) -> Item | Non If you've not already done it, it's worth reading [the documentation on leasing items](https://github.com/MeVitae/redis-work-queue/blob/main/README.md#leasing-an-item). """ - # First, to get an item, we try to move an item from the main queue to the processing list. - if block: - maybe_item_id: bytes | str | None = db.brpoplpush( - self._main_queue_key, - self._processing_key, - timeout=timeout, - ) - else: - maybe_item_id: bytes | str | None = db.rpoplpush( - self._main_queue_key, - self._processing_key, - ) - - if maybe_item_id is None: - return None - - # Make sure the item id is a string - if isinstance(maybe_item_id, bytes): - item_id: str = maybe_item_id.decode('utf-8') - elif isinstance(maybe_item_id, str): - item_id: str = maybe_item_id - else: - raise Exception("item id from work queue not bytes or string") - - # If we got an item, fetch the associated data. - data: bytes | None = db.get(self._item_data_key.of(item_id)) - if data is None: - data = bytes() - - # Setup the lease item. - # NOTE: Racing for a lease is ok. - db.setex(self._lease_key.of(item_id), lease_secs, self._session) - return Item(data, id=item_id) + while True: + # First, to get an item, we try to move an item from the main queue to the processing list. + if block: + maybe_item_id: bytes | str | None = db.brpoplpush( + self._main_queue_key, + self._processing_key, + timeout=timeout, + ) + else: + maybe_item_id: bytes | str | None = db.rpoplpush( + self._main_queue_key, + self._processing_key, + ) + + if maybe_item_id is None: + return None + + # Make sure the item id is a string + if isinstance(maybe_item_id, bytes): + item_id: str = maybe_item_id.decode('utf-8') + elif isinstance(maybe_item_id, str): + item_id: str = maybe_item_id + else: + raise Exception("item id from work queue not bytes or string") + + # If we got an item, fetch the associated data. + data: bytes | None = db.get(self._item_data_key.of(item_id)) + if data is None: + if block and timeout == 0: + continue + return None + + # Setup the lease item. + # NOTE: Racing for a lease is ok. + db.setex(self._lease_key.of(item_id), lease_secs, self._session) + return Item(data, id=item_id) def complete(self, db: Redis, item: Item) -> bool: """Marks a job as completed and remove it from the work queue. After `complete` has been @@ -143,17 +116,63 @@ def complete(self, db: Redis, item: Item) -> bool: `complete` returns a boolean indicating if *the job has been removed* **and** *this worker was the first worker to call `complete`*. So, while lease might give the same job to multiple workers, complete will return `true` for only one worker.""" - removed = int(db.lrem(self._processing_key, 0, item.id())) - # Only complete the work if it was still in the processing queue - if removed == 0: - return False item_id = item.id() - # TODO: The cleaner should also handle these... :( - db.pipeline() \ + item_del_result, _, _ = db.pipeline() \ .delete(self._item_data_key.of(item_id)) \ + .lrem(self._processing_key, 0, item.id()) \ .delete(self._lease_key.of(item_id)) \ .execute() - return True + return item_del_result is not None and item_del_result != 0 + + def _lease_exists(self, db: Redis, item_id: str) -> bool: + """True iff a lease on 'item_id' exists.""" + return db.exists(self._lease_key.of(item_id)) != 0 + + def light_clean(self, db: Redis) -> None: + # A light clean only checks items in the processing queue + processing: list[bytes | str] = db.lrange( + self._processing_key, 0, -1, + ) + for item_id in processing: + item_id = _value_str(item_id) + # If there's no lease for the item, then it should be reset. + if not self._lease_exists(db, item_id): + # We also check the item actually exists before pushing it back to the main queue + if db.exists(self._item_data_key.of(item_id)): + print(item_id, 'has no lease, it will be reset') + db.pipeline() \ + .lrem(self._processing_key, 0, item_id) \ + .lpush(self._main_queue_key, item_id) \ + .execute() + else: + print(item_id, 'was in the processing queue but does not exist') + db.lrem(self._processing_key, 0, item_id) + + def deep_clean(self, db: Redis) -> None: + # A deep clean checks all data keys + res: tuple[list[bytes | str], list[bytes | str]] = db.pipeline() \ + .keys(self._item_data_key.of('*')) \ + .lrange(self._main_queue_key, 0, -1) \ + .execute() + item_data_keys, main_queue = res + main_queue = list(map(_value_str, main_queue)) + for item_data_key in item_data_keys: + item_id = _value_str(item_data_key)[len(self._item_data_key.prefix):] + # If the item isn't in the queue, and there's no lease for the item, then it should be + # reset. + if item_id not in main_queue and not self._lease_exists(db, item_id): + print(item_id, 'has no lease, it will be reset') + db.pipeline() \ + .lrem(self._processing_key, 0, item_id) \ + .lpush(self._main_queue_key, item_id) \ + .execute() + + +def _value_str(value: bytes | str) -> str: + if isinstance(value, bytes): + return value.decode('utf-8') + assert isinstance(value, str) + return value __version__ = "0.1.2" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 9e4ef0e..1c2582d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "redis-work-queue" -version = "0.1.6" +version = "0.3.0" edition = "2021" license = "MIT" description = "A work queue, on top of a redis database, with implementations in Python, Rust, Go, Node.js (TypeScript) and Dotnet (C#)." @@ -17,7 +17,7 @@ serde_json = "1" futures = "0.3" [dependencies.redis] -version = "0.23" +version = "0.26" features = ["aio", "async-std-comp", "connection-manager"] [dependencies.serde] diff --git a/rust/src/lib.rs b/rust/src/lib.rs index fa57529..3d6f915 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -261,15 +261,32 @@ impl WorkQueue { main_queue_key: name.of(":queue"), processing_key: name.of(":processing"), //cleaning_key: name.of(":cleaning"), - lease_key: name.and(":leased_by_session:"), + lease_key: name.and(":lease:"), item_data_key: name.and(":item:"), } } - /// Add an item to the work queue. This adds the redis commands onto the pipeline passed. + /// Add an item to the work queue. + /// + /// If an item with the same ID already exists, this item is not added, and `false` is returned. Otherwise, if the item is added `true` is returned. /// - /// Use [`WorkQueue::add_item`] if you don't want to pass a pipeline directly. - pub fn add_item_to_pipeline(&self, pipeline: &mut redis::Pipeline, item: &Item) { + /// If you know the item ID is unique, and not already in the queue, use the optimised + /// [`WorkQueue::add_unique_item`] instead. + pub async fn add_item(&self, db: &mut C, item: &Item) -> RedisResult { + let added = db + .set_nx(self.item_data_key.of(&item.id), item.data.as_ref()) + .await?; + if added { + db.lpush(&self.main_queue_key, &item.id).await?; + } + Ok(added) + } + + /// Add an item, which is known to have an ID not already in the queue, to the work queue. This + /// adds the redis commands onto the pipeline passed. + /// + /// Use [`WorkQueue::add_unique_item`] if you don't want to pass a pipeline directly. + pub fn add_unique_item_to_pipeline(&self, pipeline: &mut redis::Pipeline, item: &Item) { // Add the item data // NOTE: it's important that the data is added first, otherwise someone could pop the item // before the data is ready @@ -278,12 +295,16 @@ impl WorkQueue { pipeline.lpush(&self.main_queue_key, &item.id); } - /// Add an item to the work queue. + /// Add an item, which is known to have an ID not already in the queue, to the work queue. /// /// This creates a pipeline and executes it on the database. - pub async fn add_item(&self, db: &mut C, item: &Item) -> RedisResult<()> { + pub async fn add_unique_item( + &self, + db: &mut C, + item: &Item, + ) -> RedisResult<()> { let mut pipeline = Box::new(redis::pipe()); - self.add_item_to_pipeline(&mut pipeline, item); + self.add_unique_item_to_pipeline(&mut pipeline, item); pipeline.query_async(db).await } @@ -322,44 +343,48 @@ impl WorkQueue { timeout: Option, lease_duration: Duration, ) -> RedisResult> { - // First, to get an item, we try to move an item from the main queue to the processing list. - let item_id: Option = match timeout { - Some(Duration::ZERO) => { - db.rpoplpush(&self.main_queue_key, &self.processing_key) + loop { + // First, to get an item, we try to move an item from the main queue to the processing list. + let Some(item_id): Option = (match timeout { + Some(Duration::ZERO) => { + db.rpoplpush(&self.main_queue_key, &self.processing_key) + .await? + } + _ => { + db.brpoplpush( + &self.main_queue_key, + &self.processing_key, + timeout.map(|d| d.as_secs() as f64).unwrap_or(0.), + ) .await? - } - _ => { - db.brpoplpush( - &self.main_queue_key, - &self.processing_key, - timeout.map(|d| d.as_secs() as usize).unwrap_or(0), - ) - .await? - } - }; + } + }) else { + return Ok(None); + }; + + // If we got an item, fetch the associated data. + let item_data: Vec = match db.get(self.item_data_key.of(&item_id)).await? { + Some(item_data) => item_data, + // If the item doesn't actually exist, and there's no timeout, just try again. + None if timeout == None => continue, + // If there was a timeout, we return early. + None => return Ok(None), + }; + + // Now setup the lease item. + // NOTE: Racing for a lease is ok + db.set_ex( + self.lease_key.of(&item_id), + &self.session, + lease_duration.as_secs(), + ) + .await?; - // If we got an item, fetch the associated data. - let item = match item_id { - Some(item_id) => Item { - data: db - .get::<_, Vec>(self.item_data_key.of(&item_id)) - .await? - .into_boxed_slice(), + return Ok(Some(Item { + data: item_data.into_boxed_slice(), id: item_id, - }, - None => return Ok(None), - }; - - // Now setup the lease item. - // NOTE: Racing for a lease is ok - db.set_ex( - self.lease_key.of(&item.id), - &self.session, - lease_duration.as_secs() as usize, - ) - .await?; - - Ok(Some(item)) + })); + } } /// Marks a job as completed and remove it from the work queue. After `complete` has been called @@ -369,20 +394,17 @@ impl WorkQueue { /// was the first worker to call `complete`*. So, while lease might give the same job to /// multiple workers, complete will return `true` for only one worker. pub async fn complete(&self, db: &mut C, item: &Item) -> RedisResult { - let removed: usize = db.lrem(&self.processing_key, 0, &item.id).await?; - if removed == 0 { - return Ok(false); - } // If we did actually remove it, delete the item data and lease. // If we didn't really remove it, it's probably been returned to the work queue so the // data is still needed and the lease might not be ours (if it is still ours, it'll // expire anyway). - redis::pipe() + let (items_deleted, (), ()): (usize, (), ()) = redis::pipe() .del(self.item_data_key.of(&item.id)) + .lrem(&self.processing_key, 0, &item.id) .del(self.lease_key.of(&item.id)) .query_async(db) .await?; - Ok(true) + Ok(items_deleted > 0) } } diff --git a/scripts/clear-leases b/scripts/clear-leases index b696e9e..4e82370 100755 --- a/scripts/clear-leases +++ b/scripts/clear-leases @@ -15,4 +15,4 @@ if [ "$QUEUE" == "*" ]; then else echo Deleting all lease for queue: $QUEUE. fi -redis-cli --raw KEYS "$QUEUE:leased_by_session:*" | xargs redis-cli DEL +redis-cli --raw KEYS "$QUEUE:lease:*" | xargs redis-cli DEL diff --git a/tests/README.md b/tests/README.md index c0c192a..eed7206 100644 --- a/tests/README.md +++ b/tests/README.md @@ -18,6 +18,18 @@ From the `tests` directory, to run the integration tests with all languages, use ./run-test.sh -t go_jobs,python_jobs,rust_jobs,node_jobs,dotnet_jobs ``` +To do the same, but wit the DotNet implementation of the cleaner, use: + +```bash +./run-test.sh -t go_jobs,python_jobs,rust_jobs,node_jobs,dotnet_jobs -c ./dotnet-cleaner/run.sh +``` + +To do the same, but wit the Go implementation of the cleaner, use: + +```bash +./run-test.sh -t go_jobs,python_jobs,rust_jobs,node_jobs,dotnet_jobs -c ./go-cleaner/run.sh +``` + For a summary of other options, run: ```bash @@ -38,7 +50,7 @@ For example: ##### --host -This can be used to set a specific redis server, the default is `localhost`. +This can be used to set a specific redis server, the default is `localhost:6379`. For example: @@ -46,6 +58,11 @@ For example: ./run-test.sh --tests "go_jobs,python_jobs" --host example.server.net:port ``` +##### --cleaner + +This sets a custom binary to be used to clean the work queues, see the docs in +[job-spawner-and-cleaner.py](./job-spawner-and-cleaner.py). + ## Unit tests also exist Each client implementation contains some unit tests. These are located within the implementations diff --git a/tests/add-item-tests/check.py b/tests/add-item-tests/check.py new file mode 100644 index 0000000..68879ce --- /dev/null +++ b/tests/add-item-tests/check.py @@ -0,0 +1,32 @@ +import sys +import random +from time import sleep + +import redis +from redis import Redis, WatchError + +sys.path.insert(0, '../../python') +from redis_work_queue import KeyPrefix, Item, WorkQueue + + +if len(sys.argv) < 2: + raise Exception("first command line argument must be redis host") + +host = sys.argv[1].split(":") +db = redis.Redis(host=host[0], port=int(host[1]) if len(host) > 1 else 6379) + +python_results_key = KeyPrefix("results:python:") +shared_results_key = KeyPrefix("results:shared:") + +python_queue = WorkQueue(KeyPrefix("python_jobs")) +shared_queue = WorkQueue(KeyPrefix("shared_jobs")) + +def check_queue(queue: WorkQueue): + """Ensure a queue has exatly 200 items, with ID 0, 1, 2, ..., 199.""" + remaining = list(range(0, 200)) + for item in db.lrange(queue._main_queue_key, 0, -1): + remaining.remove(int(item)) + assert len(remaining) == 0 + +check_queue(python_queue) +check_queue(shared_queue) diff --git a/tests/add-item-tests/python-tests.py b/tests/add-item-tests/python-tests.py new file mode 100644 index 0000000..fc5d3d5 --- /dev/null +++ b/tests/add-item-tests/python-tests.py @@ -0,0 +1,121 @@ +from abc import ABC, abstractmethod +import sys +import random +from time import sleep + +import redis +from redis import Redis, WatchError + +sys.path.insert(0, '../../python') +from redis_work_queue import KeyPrefix, Item, WorkQueue + + +if len(sys.argv) < 2: + raise Exception("first command line argument must be redis host") + +host = sys.argv[1].split(":") +db = redis.Redis(host=host[0], port=int(host[1]) if len(host) > 1 else 6379) + + +class ItemAdder(ABC): + """An abstract class containing the `add_item` method to add an item to a work queue. + + After running some tests, the `check` method should be called to ensure that all cases within `add_item` actually occurred. +Before running another set of tests, `reset` should be called.""" + @abstractmethod + def add_item(self, queue: WorkQueue, db: Redis, item: Item) -> bool: + ... + + @abstractmethod + def check(self) -> bool: + ... + + @abstractmethod + def reset(self): + ... + + +class AddUniqueItem(ItemAdder): + """An ItemAdder using the `WorkQueue.add_unique_item` method, with no checks.""" + + def __init__(self): + pass + + def add_item(self, queue: WorkQueue, db: Redis, item: Item) -> bool: + queue.add_unique_item(db, item) + return True + + def check(self) -> bool: + return True + + def reset(self): + pass + + +class AddItem(ItemAdder): + """An ItemAdder using the default `WorkQueue.add_item` method, which checks + if, at some point during the test, `add_new_item` has returned both `True` + and `False`.""" + + def __init__(self): + self.reset() + + def add_item(self, queue: WorkQueue, db: Redis, item: Item) -> bool: + if queue.add_item(db, item): + self.seen_true = True + return True + else: + self.seen_false = True + return False + + def check(self) -> bool: + return self.seen_true and self.seen_false + + def reset(self): + self.seen_true = False + self.seen_false = False + + +# Decide on the adder implementation to use, from the command line arguments. +if len(sys.argv) > 2 and sys.argv[2] == "--add-item": + adder = AddItem() +elif len(sys.argv) > 2 and sys.argv[2] == "--add-unique-item": + adder = AddUniqueItem() +else: + raise Exception( + "second argument should be `--add-item` or `--add-unique-item`" + ) + +python_queue = WorkQueue(KeyPrefix("python_jobs")) +shared_queue = WorkQueue(KeyPrefix("shared_jobs")) + +# Add 100 unique jobs to the python queue: +for idx in range(100, 200): + id = str(idx) + if adder.add_item(python_queue, db, Item(str(idx), id)) and idx == 150: + # If we're ahead at item 150, sleep so we end up behind + sleep(10) + +assert adder.check() +adder.reset() + +# Add 100 unique jobs to the shared queue: +for idx in range(100, 200): + id = str(idx) + if adder.add_item(shared_queue, db, Item(str(idx), id)) and idx == 150: + # If we're ahead at item 150, sleep so we end up behind (this sleep is intentionally longer + # than the last one) + sleep(20) + +assert adder.check() +adder.reset() + +# Add 100 jobs, each 10 times, to both queues: +for idx in range(0, 1000): + id = str(idx//10) + if adder.add_item(python_queue, db, Item(str(idx), id)) and idx == 500: + sleep(30) + adder.add_item(shared_queue, db, Item(str(idx), id)) + +assert adder.check() +adder.reset() diff --git a/tests/add-item-tests/run.sh b/tests/add-item-tests/run.sh new file mode 100755 index 0000000..7aaf460 --- /dev/null +++ b/tests/add-item-tests/run.sh @@ -0,0 +1,57 @@ +#!/bin/bash +set -e + +echo --- Single Threaded Tests --- + +echo ----------- Python ---------- +echo ---------- add_item --------- +echo -e 'save ""\nappendonly no' | redis-server - & +REDIS_PID=$! +# There won't be duplicate items from other workers, so this will fail. +! python3 ./python-tests.py localhost --add-item +# The previous worker didn't complete, so the queue shouldn't contain everything. +! python3 ./check.py localhost +kill $REDIS_PID +wait $REDIS_PID + +echo ------ add_unique_item ------ +echo -e 'save ""\nappendonly no' | redis-server - & +REDIS_PID=$! +# No checks, so this must pass +python3 ./python-tests.py localhost --add-unique-item +# There will be duplicates, so check should fail +! python3 ./check.py localhost +kill $REDIS_PID +wait $REDIS_PID + + +echo ---- Duel Threaded Tests ---- + +echo ----------- Python ---------- +echo ------ add_unique_item ------ +echo -e 'save ""\nappendonly no' | redis-server - & +REDIS_PID=$! +# No checks, so this should succeed. +python3 ./python-tests.py localhost --add-unique-item & +FIRST_THREAD_PID=$! +# No checks, so this should succeed. +python3 ./python-tests.py localhost --add-unique-item +wait $FIRST_THREAD_PID +# There will be duplicates, so check should fail +! python3 ./check.py localhost +kill $REDIS_PID +wait $REDIS_PID + +echo ---------- add_item --------- +echo -e 'save ""\nappendonly no' | redis-server - & +REDIS_PID=$! +# There will be jobs from the other worker, so this should pass checks. +python3 ./python-tests.py localhost --add-item & +FIRST_THREAD_PID=$! +# There will be jobs from the other worker, so this should pass checks. +python3 ./python-tests.py localhost --add-item +wait $FIRST_THREAD_PID +# And there should be no duplicate items! +python3 ./check.py localhost +kill $REDIS_PID +wait $REDIS_PID diff --git a/tests/all-tests.sh b/tests/all-tests.sh new file mode 100755 index 0000000..7e699b6 --- /dev/null +++ b/tests/all-tests.sh @@ -0,0 +1,23 @@ +#!/bin/bash +set -e + +echo -e 'save ""\nappendonly no' | redis-server - & +REDIS_PID=$! +./run-test.sh -t go_jobs,python_jobs,rust_jobs,node_jobs,dotnet_jobs +kill $REDIS_PID +wait $REDIS_PID + +echo -e 'save ""\nappendonly no' | redis-server - & +REDIS_PID=$! +./run-test.sh -t go_jobs,python_jobs,rust_jobs,node_jobs,dotnet_jobs -c ./go-cleaner/run.sh +kill $REDIS_PID +wait $REDIS_PID + +echo -e 'save ""\nappendonly no' | redis-server - & +REDIS_PID=$! +./run-test.sh -t go_jobs,python_jobs,rust_jobs,node_jobs,dotnet_jobs -c ./dotnet-cleaner/run.sh +kill $REDIS_PID +wait $REDIS_PID + +cd ./add-item-tests +./run.sh diff --git a/tests/dotnet-cleaner/Cleaner/Cleaner.csproj b/tests/dotnet-cleaner/Cleaner/Cleaner.csproj new file mode 100644 index 0000000..8551d8b --- /dev/null +++ b/tests/dotnet-cleaner/Cleaner/Cleaner.csproj @@ -0,0 +1,18 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + diff --git a/tests/dotnet-cleaner/Cleaner/Program.cs b/tests/dotnet-cleaner/Cleaner/Program.cs new file mode 100644 index 0000000..a245261 --- /dev/null +++ b/tests/dotnet-cleaner/Cleaner/Program.cs @@ -0,0 +1,33 @@ +using RedisWorkQueue; +using FreeRedis; + +class Program +{ + public static void Main(string[] args) + { + RedisClient db = new RedisClient(args[0]); + for (; ; ) + { + string? instruction = Console.ReadLine(); + if (instruction == null) break; + if (instruction == "") throw new Exception("input line must be non-empty"); + string[] parts = instruction.Split(':', 2); + if (parts.Length != 2) throw new Exception("input line must be of the format light:queue-name or deep:queue-name"); + string queueName = parts[1]; + var queue = new WorkQueue(new KeyPrefix(queueName)); + switch (parts[0]) + { + case "light": + queue.LightClean(db); + Console.WriteLine("light cleaned " + queueName); + break; + case "deep": + queue.DeepClean(db); + Console.WriteLine("deep cleaned " + queueName); + break; + default: + throw new Exception($"invalid cleaning mode: ${parts[0]}, should be \"light\" or \"deep\""); + } + } + } +} diff --git a/tests/dotnet-cleaner/run.sh b/tests/dotnet-cleaner/run.sh new file mode 100755 index 0000000..7d45836 --- /dev/null +++ b/tests/dotnet-cleaner/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash +cd "$(realpath "$(dirname $0)")"/Cleaner +exec dotnet run -v quiet "$@" diff --git a/tests/dotnet/RedisWorkQueueTests/Program.cs b/tests/dotnet/RedisWorkQueueTests/Program.cs index 5f170c9..c16b690 100644 --- a/tests/dotnet/RedisWorkQueueTests/Program.cs +++ b/tests/dotnet/RedisWorkQueueTests/Program.cs @@ -70,7 +70,7 @@ public static void Main(string[] args) Console.WriteLine("Result: ", jsonResult); if (sharedJobCounter % 12 == 0) - Thread.Sleep(1000*(sharedJobCounter % 4)); + Thread.Sleep(1000 * (sharedJobCounter % 4)); db.Set(sharedResultsKey.Of(job.ID), jsonResult); @@ -106,7 +106,7 @@ public static void Main(string[] args) Console.WriteLine($"Result: {result}"); if (dotNetJobCounter % 25 == 0) - Thread.Sleep(1000*(sharedJobCounter % 20)); + Thread.Sleep(1000 * (sharedJobCounter % 20)); db.Set(dotNetResultsKey.Of(job.ID), new byte[1] { (byte)result }); @@ -116,13 +116,20 @@ public static void Main(string[] args) if (dotNetQueue.Complete(db, job)) { + if (dotNetJobCounter % 6 == 0) + { + Console.WriteLine("Double completing"); + if (dotNetQueue.Complete(db, job)) + throw new Exception("double completion should have failed!"); + } + Console.WriteLine("Spawning shared jobs"); - sharedQueue.AddItem(db, Item.FromJson(new SharedJobData() + if (!sharedQueue.AddItem(db, Item.FromJson(new SharedJobData() { a = 19, b = result - })); - sharedQueue.AddItem(db, Item.FromJson(new SharedJobData() + }))) throw new Exception("item was not added"); + sharedQueue.AddUniqueItem(db, Item.FromJson(new SharedJobData() { a = 23, b = result diff --git a/tests/dotnet/RedisWorkQueueTests/RedisWorkQueueTests.csproj b/tests/dotnet/RedisWorkQueueTests/RedisWorkQueueTests.csproj index 4b8bdae..9d870bc 100644 --- a/tests/dotnet/RedisWorkQueueTests/RedisWorkQueueTests.csproj +++ b/tests/dotnet/RedisWorkQueueTests/RedisWorkQueueTests.csproj @@ -1,18 +1,18 @@ - net6.0 + net8.0 enable enable exe - + - - + + diff --git a/tests/go-cleaner/go.mod b/tests/go-cleaner/go.mod new file mode 100644 index 0000000..3da9955 --- /dev/null +++ b/tests/go-cleaner/go.mod @@ -0,0 +1,16 @@ +module github.com/mevitae/redis-work-queue/tests/go-cleaner + +replace github.com/mevitae/redis-work-queue/go => ../../go + +go 1.20 + +require ( + github.com/mevitae/redis-work-queue/go v0.3.0 + github.com/redis/go-redis/v9 v9.6.1 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/google/uuid v1.6.0 // indirect +) diff --git a/tests/go-cleaner/go.sum b/tests/go-cleaner/go.sum new file mode 100644 index 0000000..cb0d46b --- /dev/null +++ b/tests/go-cleaner/go.sum @@ -0,0 +1,10 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= diff --git a/tests/go-cleaner/main.go b/tests/go-cleaner/main.go new file mode 100644 index 0000000..0d2de02 --- /dev/null +++ b/tests/go-cleaner/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "strings" + + workqueue "github.com/mevitae/redis-work-queue/go" + "github.com/redis/go-redis/v9" +) + +func main() { + if len(os.Args) < 2 { + panic("first command line argument must be redis host") + } + + db := redis.NewClient(&redis.Options{ + Addr: os.Args[1], + }) + ctx := context.Background() + + stdin := bufio.NewReader(os.Stdin) + for { + instruction, err := stdin.ReadString('\n') + if err != nil { + if err == io.EOF { + break + } + panic(err) + } + instruction = instruction[:len(instruction)-1] + if queueName, ok := strings.CutPrefix(instruction, "light:"); ok { + queue := workqueue.NewWorkQueue(workqueue.KeyPrefix(queueName)) + err = queue.LightClean(ctx, db) + fmt.Println("light cleaned", queueName) + } else if queueName, ok := strings.CutPrefix(instruction, "deep:"); ok { + queue := workqueue.NewWorkQueue(workqueue.KeyPrefix(queueName)) + err = queue.DeepClean(ctx, db) + fmt.Println("deep cleaned", queueName) + } else { + panic("invalid cleaning mode") + } + if err != nil { + panic(err) + } + } +} diff --git a/tests/go-cleaner/run.sh b/tests/go-cleaner/run.sh new file mode 100755 index 0000000..f799610 --- /dev/null +++ b/tests/go-cleaner/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash +cd "$(realpath "$(dirname $0)")" +exec go run . "$@" diff --git a/tests/go/go.mod b/tests/go/go.mod index 5180e60..802fc25 100644 --- a/tests/go/go.mod +++ b/tests/go/go.mod @@ -5,12 +5,12 @@ replace github.com/mevitae/redis-work-queue/go => ../../go go 1.20 require ( - github.com/mevitae/redis-work-queue/go v0.1.0 - github.com/redis/go-redis/v9 v9.0.2 + github.com/mevitae/redis-work-queue/go v0.3.0 + github.com/redis/go-redis/v9 v9.6.1 ) require ( - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.6.0 // indirect ) diff --git a/tests/go/go.sum b/tests/go/go.sum index 5362895..cb0d46b 100644 --- a/tests/go/go.sum +++ b/tests/go/go.sum @@ -1,14 +1,10 @@ -github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ= -github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE= -github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= +github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= diff --git a/tests/go/main.go b/tests/go/main.go index 7d0e621..d9cb936 100644 --- a/tests/go/main.go +++ b/tests/go/main.go @@ -144,6 +144,17 @@ func main() { panic(err) } if completed { + if goJobCounter%6 == 0 { + fmt.Println("Double completing") + doubleCompleted, err := goQueue.Complete(ctx, db, job) + if err != nil { + panic(err) + } + if doubleCompleted { + panic("double completino should have failed!") + } + } + fmt.Println("Spawning shared jobs") // If we succesfully completed the result, create two new shared jobs. item, err := workqueue.NewItemFromJSONData(SharedJobData{ @@ -153,10 +164,13 @@ func main() { if err != nil { panic(err) } - err = sharedQueue.AddItem(ctx, db, item) + added, err := sharedQueue.AddItem(ctx, db, item) if err != nil { panic(err) } + if !added { + panic("item was not added") + } item, err = workqueue.NewItemFromJSONData(SharedJobData{ A: 11, @@ -165,7 +179,7 @@ func main() { if err != nil { panic(err) } - err = sharedQueue.AddItem(ctx, db, item) + err = sharedQueue.AddUniqueItem(ctx, db, item) if err != nil { panic(err) } diff --git a/tests/job-spawner-and-cleaner.py b/tests/job-spawner-and-cleaner.py index a072393..dbc5822 100644 --- a/tests/job-spawner-and-cleaner.py +++ b/tests/job-spawner-and-cleaner.py @@ -1,5 +1,6 @@ import sys import json +import subprocess from time import sleep import redis @@ -11,18 +12,94 @@ if len(sys.argv) < 2: raise Exception("first command line argument must be redis host") host = sys.argv[1].split(":") -queue_list_names = sys.argv[2].split(" ") +if len(sys.argv) < 3: + raise Exception("second command line argument must be space-separated list of queue names (don't ask me why an argument is space separated)") +queue_names = sys.argv[2].split(" ") db = redis.Redis(host=host[0], port=int(host[1]) if len(host) > 1 else 6379) if len(db.keys("*")) > 0: raise Exception("redis database isn't clean") -shared_queue = WorkQueue(KeyPrefix("shared_jobs")) +shared_queue_name = "shared_jobs" +shared_queue = WorkQueue(KeyPrefix(shared_queue_name)) queue_list = list(map( lambda name: WorkQueue(KeyPrefix(name)), - queue_list_names, + queue_names, )) +def python_light_clean(): + for queue in queue_list: + queue.light_clean(db) + shared_queue.light_clean(db) + +def python_deep_clean(): + for queue in queue_list: + queue.deep_clean(db) + shared_queue.deep_clean(db) + +class ExternalCleaner: + """This wraps an external process to be used for queue cleaning. + + The process should read line from stdin and write lines to stdout. + + Upon reading a line, that line should be interpreted as a queue name, and that queue should be + cleaned. After the queue has been cleaned, the program should write to stdout "cleaned ", + followed by the name of the queue, followed by a newline. + + The program must process the cleaning requests in the order they are sent.""" + + def __init__(self, command: list[str]): + self.child: subprocess.Popen = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=sys.stderr, + text=True, + ) + + def clean(self, mode: str, queue_name: str): + """Request and wait for a single queue to be cleaned (mode should be "light" or "deep").""" + if not self.check(): + raise Exception('cleaner process is dead') + + assert self.child.stdin is not None + self.child.stdin.write(mode + ":" +queue_name + "\n") + self.child.stdin.flush() + + assert self.child.stdout is not None + output = "" + while not output.startswith(mode + " cleaned "): + if output != "": + print(output) + output = self.child.stdout.readline().strip() + assert output == mode + " cleaned " + queue_name + + def clean_all(self, mode: str): + """Clean all the work queues with the provided mode (either "light" or "deep").""" + for queue_name in queue_names: + self.clean(mode, queue_name) + self.clean(mode, shared_queue_name) + + def light_clean_all(self): + """Light clean all the work queues.""" + self.clean_all("light") + + def deep_clean_all(self): + """Light clean all the work queues.""" + self.clean_all("deep") + + def check(self) -> bool: + """Check that the process is still running.""" + return self.child.poll() is None + +light_clean = python_light_clean +deep_clean = python_deep_clean +if len(sys.argv) > 3 and sys.argv[3] != "": + # Pass the redis host to the cleaner command + cleaner = ExternalCleaner([sys.argv[3], sys.argv[1]]) + light_clean = cleaner.light_clean_all + deep_clean = cleaner.deep_clean_all + counter = 0 doom_counter = 0 revived = False @@ -57,10 +134,24 @@ queue.add_item(db, Item(bytes([n]))) else: # Otherwise, clean! - print("Cleaning") - for queue in queue_list: - queue.light_clean(db) - shared_queue.light_clean(db) + print("Cleaning " + str(counter % 13)) + # This creates all the possible messed-up cleaning cases: + if counter % 13 == 2 or counter % 13 == 3: + # Occasionally remove items from processing + db.rpop(KeyPrefix(shared_queue_name).of(":processing")) + elif counter % 13 == 4: + # Occasionally remove items from queue + db.rpop(KeyPrefix(shared_queue_name).of(":queue")) + elif counter % 13 == 5: + # Occasionally copy items from queue -> processing + items = db.lrange(KeyPrefix(shared_queue_name).of(":queue"), 0, 1) + print(items) + if len(items) > 0: + db.lpush(KeyPrefix(shared_queue_name).of(":processing"), items[0]) + elif counter % 13 == 6: + # Occasionally deep clean + deep_clean() + light_clean() # The `doom_counter` counts the number of consecutive times all the lengths are 0. doom_counter = doom_counter + 1 if all(map( lambda queue: queue.queue_len(db) == 0 and queue.processing(db) == 0, @@ -98,15 +189,15 @@ } -for queue_name in queue_list_names[:]: +for queue_name in queue_names[:]: if queue_name not in expecting_dict_config: - queue_list_names.remove(queue_name) + queue_names.remove(queue_name) keys_to_delete = [] for queue_name in expecting_dict_config: - if queue_name not in queue_list_names: + if queue_name not in queue_names: keys_to_delete.append(queue_name) for queue_name in keys_to_delete: @@ -152,14 +243,15 @@ raise Exception('found unexpected key: ' + key) updated_names = [] -for name in queue_list_names: +for name in queue_names: updated_names.append(name.replace("_jobs", "")) total_count_keys = 0 for key in shared_counts.keys(): total_count_keys += shared_counts[key] -maximum_allowed = total_count_keys/len(shared_counts)*1.2 +minimum_allowed = total_count_keys/len(shared_counts)*0.7 +maximum_allowed = total_count_keys/len(shared_counts)*1.3 print("Maximum number of job counts:", maximum_allowed) @@ -167,4 +259,5 @@ assert key in updated_names # Check that it's fairly well balanced print(key, "Job counts:", shared_counts[key]) + assert minimum_allowed < shared_counts[key] assert shared_counts[key] < maximum_allowed diff --git a/tests/node/index.ts b/tests/node/index.ts index 16207a2..0b57d66 100644 --- a/tests/node/index.ts +++ b/tests/node/index.ts @@ -82,7 +82,14 @@ async function main() { // Complete the job unless we're 'unlucky' and crash again if (nodeJobCounter % 29 !== 0) { if (await nodeQueue.complete(db, job)) { - await sharedQueue.addItem( + if (nodeJobCounter % 6 === 0) { + console.log("Double completing") + if (await nodeQueue.complete(db, job)) { + throw new Error("double completion should have failed!"); + } + } + + if (!await sharedQueue.addItem( db, new Item( JSON.stringify({ @@ -90,8 +97,8 @@ async function main() { b: result, }), ), - ); - await sharedQueue.addItem( + )) throw new Error("item was not added"); + await sharedQueue.addUniqueItem( db, new Item( JSON.stringify({ diff --git a/tests/python-tests.py b/tests/python-tests.py index 2c2f4cc..39eb11f 100644 --- a/tests/python-tests.py +++ b/tests/python-tests.py @@ -96,12 +96,18 @@ print("Completing") # If we succesfully completed the result, create two new shared jobs. if python_queue.complete(db, job): + if python_job_counter % 6 == 0: + print("Double completing") + if python_queue.complete(db, job): + raise Exception("double completion should have failed"); + print("Spawning shared jobs") - shared_queue.add_item(db, Item.from_json_data({ + if not shared_queue.add_item(db, Item.from_json_data({ 'a': 13, 'b': result, - })) - shared_queue.add_item(db, Item.from_json_data({ + })): + raise Exception("item was not added") + shared_queue.add_unique_item(db, Item.from_json_data({ 'a': 17, 'b': result, })) diff --git a/tests/run-test.sh b/tests/run-test.sh index 142eb99..7a3d5d4 100755 --- a/tests/run-test.sh +++ b/tests/run-test.sh @@ -4,6 +4,7 @@ set -e # Default values tests="" host="localhost:6379" +cleaner="" display_usage() { @@ -11,6 +12,7 @@ display_usage() { echo "Options:" echo " -t, --tests Specify test categories (go_jobs, python_jobs, rust_jobs, node_jobs, dotnet_jobs). Example use './run-test.sh --tests "go_jobs,python_jobs"'" echo " -h, --host Set the host (default: localhost:6379)" + echo " -c, --cleaner Run as the cleaner binary, see the docs in job-spawner-and-cleaner.py" echo " -h, --help Display this help message" } @@ -30,6 +32,11 @@ while [[ $# -gt 0 ]]; do shift shift ;; + -c|--cleaner) + cleaner="$2" + shift + shift + ;; -h|--help) display_usage exit 0 @@ -93,8 +100,10 @@ if [[ "$tests" == *"dotnet"* ]]; then fi if [[ "$tests" == *"node"* ]]; then - cd node echo "Installing Node.js dependencies" + cd ../node + npm install + cd ../tests/node npm ci echo "Running Node.js workers..." npm run test "$host" > /tmp/redis-work-queue-test-logs/node-worker-1.txt & @@ -105,4 +114,4 @@ if [[ "$tests" == *"node"* ]]; then fi echo "Running spawner..." -python3 job-spawner-and-cleaner.py "$host" "$tests" +python3 job-spawner-and-cleaner.py "$host" "$tests" "$cleaner" diff --git a/tests/rust/Cargo.lock b/tests/rust/Cargo.lock index d8358f6..9da5d53 100644 --- a/tests/rust/Cargo.lock +++ b/tests/rust/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "arc-swap" -version = "1.6.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] name = "async-channel" @@ -126,9 +126,9 @@ checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", @@ -280,9 +280,9 @@ dependencies = [ [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] @@ -428,9 +428,9 @@ checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" [[package]] name = "idna" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -527,6 +527,34 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.31.1" @@ -550,9 +578,9 @@ checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project" @@ -610,18 +638,18 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.66" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.33" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" dependencies = [ "proc-macro2", ] @@ -658,9 +686,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.23.2" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffd6543a7bc6428396845f6854ccf3d1ae8823816592e2cbe74f20f50f209d02" +checksum = "e902a69d09078829137b4a5d9d082e0490393537badd7c91a3d69d14639e115f" dependencies = [ "arc-swap", "async-std", @@ -670,11 +698,12 @@ dependencies = [ "futures", "futures-util", "itoa", + "num-bigint", "percent-encoding", "pin-project-lite", "ryu", "sha1_smol", - "socket2 0.4.9", + "socket2 0.5.3", "tokio", "tokio-retry", "tokio-util", @@ -683,7 +712,7 @@ dependencies = [ [[package]] name = "redis-work-queue" -version = "0.1.6" +version = "0.3.0" dependencies = [ "futures", "redis", @@ -694,7 +723,7 @@ dependencies = [ [[package]] name = "rust-tests" -version = "0.1.0" +version = "0.3.0" dependencies = [ "futures-lite", "redis", @@ -797,9 +826,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.29" +version = "2.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" +checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525" dependencies = [ "proc-macro2", "quote", @@ -904,9 +933,9 @@ dependencies = [ [[package]] name = "url" -version = "2.4.0" +version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", "idna", diff --git a/tests/rust/Cargo.toml b/tests/rust/Cargo.toml index 2b3a604..c8744e2 100644 --- a/tests/rust/Cargo.toml +++ b/tests/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rust-tests" -version = "0.1.0" +version = "0.3.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -8,6 +8,6 @@ edition = "2021" [dependencies] redis-work-queue = { path = "../../rust" } futures-lite = "1" -redis = "0.23" +redis = "0.26" serde = "1" serde_json = "1" diff --git a/tests/rust/src/main.rs b/tests/rust/src/main.rs index 1472cc1..6b7c8da 100644 --- a/tests/rust/src/main.rs +++ b/tests/rust/src/main.rs @@ -30,7 +30,7 @@ async fn async_main() -> RedisResult<()> { .next() .expect("first command line argument must be redis host"); let db = &mut redis::Client::open(format!("redis://{host}/"))? - .get_async_connection() + .get_multiplexed_async_connection() .await?; let rust_results_key = KeyPrefix::new("results:rust:".to_string()); @@ -49,17 +49,18 @@ async fn async_main() -> RedisResult<()> { shared_job_counter += 1; // First, try to get a job from the shared job queue - let timeout = if shared_job_counter%5 == 0 { + let timeout = if shared_job_counter % 5 == 0 { Some(Duration::from_secs(1)) } else { Some(Duration::ZERO) }; println!("Leasing from shared with timeout: {:?}", timeout); - let Some(job) = shared_queue.lease( - db, - timeout, - Duration::from_secs(2), - ).await? else { continue }; + let Some(job) = shared_queue + .lease(db, timeout, Duration::from_secs(2)) + .await? + else { + continue; + }; // Also, if we get 'unlucky', crash while completing the job. if shared_job_counter % 7 == 0 { println!("Dropping job"); @@ -97,17 +98,18 @@ async fn async_main() -> RedisResult<()> { rust_job_counter += 1; // First, try to get a job from the rust job queue - let timeout = if shared_job_counter%6 == 0 { + let timeout = if shared_job_counter % 6 == 0 { Some(Duration::from_secs(2)) } else { Some(Duration::ZERO) }; println!("Leasing from rust with timeout: {:?}", timeout); - let Some(job) = rust_queue.lease( - db, - timeout, - Duration::from_secs(1), - ).await? else { continue }; + let Some(job) = rust_queue + .lease(db, timeout, Duration::from_secs(1)) + .await? + else { + continue; + }; // Also, if we get 'unlucky', crash while completing the job. if rust_job_counter % 7 == 0 { println!("Dropping job"); @@ -132,6 +134,13 @@ async fn async_main() -> RedisResult<()> { if rust_job_counter % 29 != 0 { println!("Completing"); if rust_queue.complete(db, &job).await? { + if rust_job_counter % 6 == 0 { + println!("Double completing"); + if rust_queue.complete(db, &job).await? { + panic!("double completion should have failed!"); + } + } + println!("Spawning shared jobs"); // If we succesfully completed the result, create two new shared jobs. let item = Item::from_json_data(&SharedJobData { @@ -139,14 +148,16 @@ async fn async_main() -> RedisResult<()> { b: job.data[0] as i32, }) .unwrap(); - shared_queue.add_item(db, &item).await?; + if !shared_queue.add_item(db, &item).await? { + panic!("item was not added"); + } let item = Item::from_json_data(&SharedJobData { a: 5, b: job.data[0] as i32, }) .unwrap(); - shared_queue.add_item(db, &item).await?; + shared_queue.add_unique_item(db, &item).await?; } } else { println!("Dropping");