feat(batch): add flag in SqsFifoProcessor to enable continuous message processing#3954
feat(batch): add flag in SqsFifoProcessor to enable continuous message processing#3954leandrodamascena merged 18 commits intoaws-powertools:developfrom leandrodamascena:improve-sqs-fifo-batch
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## develop #3954 +/- ##
===========================================
- Coverage 96.38% 96.31% -0.07%
===========================================
Files 214 215 +1
Lines 10030 10277 +247
Branches 1846 1914 +68
===========================================
+ Hits 9667 9898 +231
- Misses 259 271 +12
- Partials 104 108 +4 ☔ View full report in Codecov by Sentry. |
…id with failed messages
Of course, @rubenfonseca! Let's explore ways to refactor it for better readability and reduced cognitive load. |
|
@leandrodamascena I've pushed a suggestion, and kinda rewrote the whole FIFO processor. Instead of overriding the |
leandrodamascena
left a comment
There was a problem hiding this comment.
Hey @rubenfonseca! This refactoring is impressive, and the code is much clearer now. However, I believe I've found a bug and would appreciate your insights on it.
Also, I think I need to review the tests because we didn't catch this bug in those tests.
|
@leandrodamascena ready for the next review |
Thanks @rubenfonseca! Everything is working as expected, and the codebase is so clean! I've only left two minor comments regarding imports, and then we're ready to merge. |
|




Issue number: #2981
Summary
Changes
This pull request introduces a new feature to the
SqsFifoProcessorclass, allowing for continuous message processing by adding a flag. With this enhancement, users can choose to maintain message processing without halting after an initial failure.User experience
payload.json
{ "Records":[ { "messageId":"24045de4-a92e-4277-840e-994236b646f6", "receiptHandle":"AQEBL8wKtlV8KYnf1Jgz2tdTrAfBSWIhZ/0snUM1FrCK2BsWOq02i0f+L7ufkQHHebbWJ/ZRuc8AvGeB90n/q86llnPhZJu5hSSndYhhviDQXZf/TsIb/waNVWdW70AAp8Mg5MBWAkcdWNm4nz4xqKKi+ntjqh1wwM8RwFTrJezwxxBaWRcTZUS2XsaYX+gGhQClnWnppfJ8OUiU+l9mOd92ciDRDIlAKKcFgfAoqrkDig8OQzsB+ajwAnM2gdwWyK0JjTYS1vgR6i8quepgesY+hg==", "body": "{\"item\": {\"laptop\": \"amd\"}}", "attributes":{ "ApproximateReceiveCount":"1", "SentTimestamp":"1703675223472", "SequenceNumber":"18882884930918384129", "MessageGroupId":"2", "SenderId":"AIDAS5S4WFUBFAUCROFBE", "MessageDeduplicationId":"ae0eee0c918b8b88835554d31d9473ad04465a3866e48dba7b1e0fab51d2c4a4", "ApproximateFirstReceiveTimestamp":"1703675223484" }, "messageAttributes":{ }, "md5OfBody":"29a62446b7883b1840cd85bd79091e44", "eventSource":"aws:sqs", "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo", "awsRegion":"us-west-1" }, { "messageId":"455fc143-6eb4-4663-bbda-0c19dd04f756", "receiptHandle":"AQEB7EmRsjoiaFSDeZxDkq9PKfcpamqUrshhfK5vGsX/jZE09+b21OHfG0zsHkML4VDsrgskEzp2AaNfyaZXk7dfa31hEHMRa2EscRIrj6CSRF78iHSRwVsxrRSpx8w5NW51Z6PGOtVK+Vi5Cqto28CACcGjN69tWiNFHYwczVVb0oSRtNSk6CGTijKKNb5BzVXfzGmukEEk+E2+Qqmh8vEoKiqVLqF7Q9Wbj5znMxkUMFbs/1r8KEpj74D2s5Ch9Mi3wOapY1fY0km9mElZp4OprA==", "body": "{\"item\": {\"laptop\": \"amd\"}}", "attributes":{ "ApproximateReceiveCount":"1", "SentTimestamp":"1703675223472", "SequenceNumber":"18882884930918384130", "MessageGroupId":"2", "SenderId":"AIDAS5S4WFUBFAUCROFBE", "MessageDeduplicationId":"28ce33b1552d6ee566c48eb91b4fc29bded1fa7fea62485ca7152c3ec68e79f0", "ApproximateFirstReceiveTimestamp":"1703675223484" }, "messageAttributes":{ }, "md5OfBody":"05c7dd78a740a557a3d8712fa28650a6", "eventSource":"aws:sqs", "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo", "awsRegion":"us-west-1" }, { "messageId":"f7188ab6-ae5e-47a2-83bf-7da7ca110994", "receiptHandle":"AQEBCeLYV6Rxc40DA95XwfQGTZtxlGYLhpQTEERZCObtx7kr0HvDBFOjmnvKdbf5ksTSDj9ElNHHAEP2PhLDqhFhpdx8qgbgoPgXtTlcLpadgd2YtGb+bCPSJb5AoMtsz5vBmcSw1EjigYwE8+SHqCeP+rhQXl432MJtHxTsu3HNSG/1sc3ICw/uwM7K9LQ9tw0BS13WNLs3TwHgbc2FqWXBqooSAKLE0y0yPIDGvY/39AL6npBt3+65PIYMRgeAXz6NAxE0g579d+iX7nnPudDnXw==", "body": "{\"item\": {\"laptop\": \"amd\"}}", "attributes":{ "ApproximateReceiveCount":"1", "SentTimestamp":"1703675223472", "SequenceNumber":"18882884930918384131", "MessageGroupId":"4", "SenderId":"AIDAS5S4WFUBFAUCROFBE", "MessageDeduplicationId":"7b46f0df14493ebf41b7ef5c6d1da625859a2205324351e4022db9203b000295", "ApproximateFirstReceiveTimestamp":"1703675223484" }, "messageAttributes":{ }, "md5OfBody":"7d6fbfc49a1f8d404729fbea3807fb80", "eventSource":"aws:sqs", "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo", "awsRegion":"us-west-1" }, { "messageId":"c627fea1-e548-43b4-b23b-fd33d93f96e8", "receiptHandle":"AQEBrrpYtiVCMqkLpc6L9oZrh/MRNlZxaE9tLTygm7fTVyQA0UNFfeNZslPemxbnx1XsQVhe/pXfpuzPzuKWSsbhP44+39D5IZI8l8Xnx4nweznqolih+JaDCt/7m3BvCXrPxqWDAHlGgmIZcoFMS9lU58zY5tUU2dALlz+j3Ju4x90grsr3fCiIziipX/RYRC3gzoUoLVxTfIrQLQ3Jgvsxm4UA6LIuR7gj/gcwpjMYt507FAuhBJdSB8kKlBoYMtul0vLegfMPl55BfPXdNEMdLg==", "body": "{\"item\": {\"laptop\": \"amd\"}}", "attributes":{ "ApproximateReceiveCount":"1", "SentTimestamp":"1703675223472", "SequenceNumber":"18882884930918384133", "MessageGroupId":"4", "SenderId":"AIDAS5S4WFUBFAUCROFBE", "MessageDeduplicationId":"1eea03c3f7e782c7bdc2f2a917f40389314733ff39f5ab16219580c0109ade98", "ApproximateFirstReceiveTimestamp":"1703675223484" }, "messageAttributes":{ }, "md5OfBody":"6aae5e0be0dc30894275756efda242e5", "eventSource":"aws:sqs", "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo", "awsRegion":"us-west-1" }, { "messageId":"c627fea1-e548-43b4-b23b-fd33d93f9GR6", "receiptHandle":"AQEBrrpYtiVCMqkLpc6L9oZrh/MRNlZxaE9tLTygm7fTVyQA0UNFfeNZslPemxbnx1XsQVhe/pXfpuzPzuKWSsbhP44+39D5IZI8l8Xnx4nweznqolih+JaDCt/7m3BvCXrPxqWDAHlGgmIZcoFMS9lU58zY5tUU2dALlz+j3Ju4x90grsr3fCiIziipX/RYRC3gzoUoLVxTfIrQLQ3Jgvsxm4UA6LIuR7gj/gcwpjMYt507FAuhBJdSB8kKlBoYMtul0vLegfMPl55BfPXdNEMdLg==", "body": "{\"item\": {\"laptop\": \"amd\"}}", "attributes":{ "ApproximateReceiveCount":"1", "SentTimestamp":"1703675223472", "SequenceNumber":"18882884930918384133", "MessageGroupId":"6", "SenderId":"AIDAS5S4WFUBFAUCROFBE", "MessageDeduplicationId":"1eea03c3f7e782c7bdc2f2a917f40389314733ff39f5ab16219580c0109ade98", "ApproximateFirstReceiveTimestamp":"1703675223484" }, "messageAttributes":{ }, "md5OfBody":"6aae5e0be0dc30894275756efda242e5", "eventSource":"aws:sqs", "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo", "awsRegion":"us-west-1" }, { "messageId":"d8b02313-c025-4c48-bd36-73555186baa2", "receiptHandle":"AQEB9dBiqbg3IFxWJDw8BI6jWdknnHSQM/COWjyiWQwhlWqd1WTxm0o3zFokJDkR4LtKrCLe03PH4LDNF5x66fCdj8YHXHS8nYOqC2/bfDyc6ewDS3lLNvbWDI5KhHZtcn0dgTfDoX16ron8KBcYmUztEpRFb/Ytj85eKFg6yIJhVgx0sYpCR4k39ueaWGqo6YzYXJuyDJ9q07T1wMnaAnFPoOnBfa0dOf0q/IGj+OcNHldTVTwRxvfQEQRkRq4lbORfmtjlH6utuEM/rm+5hR2Xfg==", "body": "{\"item\": {\"laptop\": \"amd\"}}", "attributes":{ "ApproximateReceiveCount":"1", "SentTimestamp":"1703675223472", "SequenceNumber":"18882884930918384135", "MessageGroupId":"1", "SenderId":"AIDAS5S4WFUBFAUCROFBE", "MessageDeduplicationId":"e92f65051dc808cb4a9bf23a1c9f143ae4cf3029acd7502f5fb020c3ab0fc502", "ApproximateFirstReceiveTimestamp":"1703675223484" }, "messageAttributes":{ }, "md5OfBody":"737c6afff14f8ec8db181e8c7cb18589", "eventSource":"aws:sqs", "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo", "awsRegion":"us-west-1" }, { "messageId":"bf3a1af3-b849-4f9f-abc4-d254f741ea1e", "receiptHandle":"AQEBfBDdLA6E4PBWMo7ReyPW1O/BrY86m55eicY8fwT7U+0dkSLdY6GGYp3EeOxBoUucHBkXTgceujq/TO/O5NilJAN7KN1xyHwBEU26rOpQLE2n4UuscbLEVmSh5YzT877CfHRVtQ29QaHseNIrmzpCG3KTFWexZbBj00VOXTDf6+YzcJ+QlvtRSoF37yS4B99a69zMTevaJ/7+UeUE9l24ltNOzIZrN4p6ylPRKb8Vbk7ns2ZUWZC3p/yPK+fDhbCmoOOH5LEWIKUVLF/QIJVuNg==", "body": "{\"item\": {\"laptop\": \"amd\"}}", "attributes":{ "ApproximateReceiveCount":"1", "SentTimestamp":"1703675223472", "SequenceNumber":"18882884930918384136", "MessageGroupId":"1", "SenderId":"AIDAS5S4WFUBFAUCROFBE", "MessageDeduplicationId":"33d2b5ce7da1725dc7b73195aa3264a087363808af93fddabbd83a447e66ce85", "ApproximateFirstReceiveTimestamp":"1703675223484" }, "messageAttributes":{ }, "md5OfBody":"40c58ee85cc790638034668b73f8b444", "eventSource":"aws:sqs", "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo", "awsRegion":"us-west-1" }, { "messageId":"bf3a1af3-b849-4f9f-abc4-ssssa", "receiptHandle":"AQEBfBDdLA6E4PBWMo7ReyPW1O/BrY86m55eicY8fwT7U+0dkSLdY6GGYp3EeOxBoUucHBkXTgceujq/TO/O5NilJAN7KN1xyHwBEU26rOpQLE2n4UuscbLEVmSh5YzT877CfHRVtQ29QaHseNIrmzpCG3KTFWexZbBj00VOXTDf6+YzcJ+QlvtRSoF37yS4B99a69zMTevaJ/7+UeUE9l24ltNOzIZrN4p6ylPRKb8Vbk7ns2ZUWZC3p/yPK+fDhbCmoOOH5LEWIKUVLF/QIJVuNg==", "body": "{\"item\": {\"laptop\": \"amd\"}}", "attributes":{ "ApproximateReceiveCount":"1", "SentTimestamp":"1703675223472", "SequenceNumber":"18882884930918384136", "MessageGroupId":"3", "SenderId":"AIDAS5S4WFUBFAUCROFBE", "MessageDeduplicationId":"33d2b5ce7da1725dc7b73195aa3264a087363808af93fddabbd83a447e66ce85", "ApproximateFirstReceiveTimestamp":"1703675223484" }, "messageAttributes":{ }, "md5OfBody":"40c58ee85cc790638034668b73f8b444", "eventSource":"aws:sqs", "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo", "awsRegion":"us-west-1" } ] }Before
AFTER
Checklist
If your change doesn't seem to apply, please leave them unchecked.
Is this a breaking change?
RFC issue number:
Checklist:
Acknowledgment
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.