[Filebeat/Filestream] Fix missing last few lines of a file#47247
Merged
belimawr merged 57 commits intoNov 13, 2025
Conversation
This commit prevents files to be closed due to inactivity if their size has changed. A race condition could happen when the file watcher would send a write event, the harvester was still running, so a new one would not start, however at about the same time, the inactive timeout would be reached, closing the harvester before the data that triggered the write event was ingested. This commit fixes it by checking if the file size has changed before closing the file due to inactivity.
…heck-file-before-exit
Co-authored-by: Orestis Floros <orestisflo@gmail.com>
Co-authored-by: Orestis Floros <orestisflo@gmail.com>
…heck-file-before-exit
This reverts commit c5dd8ab.
When a harvester is closed it notifies the file path and size to the FSWatcher. When the FSWatcher runs its scan, if there was a notification from a harvester to that file, the size informed by the harvester is used instead of the one found in the FSWatcher history.
Contributor
🤖 GitHub commentsExpand to view the GitHub comments
Just comment with:
|
Contributor
There was a problem hiding this comment.
Pull Request Overview
This PR fixes a bug where the filestream input could miss ingesting lines when a file becomes inactive during a harvester backoff period and new data is written. The key changes implement a notification mechanism that allows the file watcher to track when harvesters close and update its internal state accordingly.
- Adds harvester closure notifications to the file watcher to prevent missed data ingestion
- Introduces mutex protection for offset and lastTimeRead to ensure atomic updates
- Refactors test infrastructure to use logptest for improved test logging
Reviewed Changes
Copilot reviewed 17 out of 18 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| go.mod | Adds temporary replace directive for elastic-agent-libs dependency |
| filebeat/input/filestream/prospector.go | Sets observer channel for harvester notifications |
| filebeat/input/filestream/prospector_test.go | Adds NotifyChan method to test mocks |
| filebeat/input/filestream/internal/input-logfile/store.go | Splits isDeleted into locked and unlocked versions |
| filebeat/input/filestream/internal/input-logfile/publish.go | Uses unlocked version of isDeleted when lock already held |
| filebeat/input/filestream/internal/input-logfile/manager.go | Adds Path method to Source interface |
| filebeat/input/filestream/internal/input-logfile/manager_test.go | Updates test mocks to implement new Path method |
| filebeat/input/filestream/internal/input-logfile/harvester.go | Implements harvester closure notification mechanism |
| filebeat/input/filestream/internal/input-logfile/fswatch.go | Adds size tracking and Size/SetSize methods to FileDescriptor |
| filebeat/input/filestream/identifier.go | Implements Path method for fileSource |
| filebeat/input/filestream/fswatch.go | Implements notification handling and state updates in file watcher |
| filebeat/input/filestream/filestream.go | Adds mutex protection for offset and lastTimeRead |
| filebeat/input/filestream/environment_test.go | Migrates to logptest for better test logging |
| filebeat/input/filestream/input_delete_integration_test.go | Updates to use new test logger structure |
| filebeat/input/filestream/input_integration_test.go | Adds integration test for data-after-close-inactive scenario |
| filebeat/input/filestream/filestream_test.go | Adds benchmark tests for offset/lastTimeRead update approaches |
| changelog/fragments/1760488402-Prevent-modified-files-close-due-to-inactivity.yaml | Documents the bug fix in changelog |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
rdner
reviewed
Nov 7, 2025
…:belimawr/beats into 46923-inactive-check-file-before-exit-v2
rdner
approved these changes
Nov 13, 2025
Member
rdner
left a comment
There was a problem hiding this comment.
Thank you for fixing this and making sure there is no performance regression!
Contributor
|
@Mergifyio backport 9.1 9.2 |
Contributor
✅ Backports have been createdDetails
|
mergify Bot
pushed a commit
that referenced
this pull request
Nov 13, 2025
Filestream could miss ingesting the last few lines of a file when the following happened: - The Harvester reaches EOF and stops on its backoff - The inactive check that runs on its own go routine marks the file as inactive and cancels the reader/harvester context. - The file watcher, that runs on its own goroutine, detects a change in file size and tries to start a new harvester. The file watcher updates its internal state of the file as its current size. - The harvester fails to start because there is one already running (the one blocked on backoff wait). - The backoff expires and the Harvester resumes running and exits right away. - The file watcher has a state (size) for the file that is different than what was actually ingested, so it does not try to start a new harvester until there is another change in the file. This makes Filebeat to miss the last few lines added to the file. This commit fixes this problem by making the harvester notify the file watcher when it stops and the amount of data is has read. During the scan the file watcher can replace its internal state by the harvester, allowing it to start a new harvester if necessary. --------- Co-authored-by: Orestis Floros <orestisflo@gmail.com> Co-authored-by: Emilio Alvarez Piñeiro <95703246+emilioalvap@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> (cherry picked from commit 3fa1a5e)
mergify Bot
pushed a commit
that referenced
this pull request
Nov 13, 2025
Filestream could miss ingesting the last few lines of a file when the following happened: - The Harvester reaches EOF and stops on its backoff - The inactive check that runs on its own go routine marks the file as inactive and cancels the reader/harvester context. - The file watcher, that runs on its own goroutine, detects a change in file size and tries to start a new harvester. The file watcher updates its internal state of the file as its current size. - The harvester fails to start because there is one already running (the one blocked on backoff wait). - The backoff expires and the Harvester resumes running and exits right away. - The file watcher has a state (size) for the file that is different than what was actually ingested, so it does not try to start a new harvester until there is another change in the file. This makes Filebeat to miss the last few lines added to the file. This commit fixes this problem by making the harvester notify the file watcher when it stops and the amount of data is has read. During the scan the file watcher can replace its internal state by the harvester, allowing it to start a new harvester if necessary. --------- Co-authored-by: Orestis Floros <orestisflo@gmail.com> Co-authored-by: Emilio Alvarez Piñeiro <95703246+emilioalvap@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> (cherry picked from commit 3fa1a5e) # Conflicts: # filebeat/input/filestream/environment_test.go # filebeat/input/filestream/filestream.go # filebeat/input/filestream/fswatch.go # filebeat/input/filestream/fswatch_test.go # filebeat/input/filestream/input.go # filebeat/input/filestream/input_delete_integration_test.go # filebeat/input/filestream/internal/input-logfile/fswatch.go # filebeat/input/filestream/internal/input-logfile/harvester.go # filebeat/input/filestream/internal/input-logfile/harvester_test.go # filebeat/input/filestream/prospector_creator.go # filebeat/input/filestream/prospector_creator_test.go # filebeat/tests/integration/filestream_truncation_test.go # libbeat/tests/integration/datagenerator.go
belimawr
added a commit
to belimawr/beats
that referenced
this pull request
Nov 14, 2025
belimawr
added a commit
that referenced
this pull request
Nov 17, 2025
…47619) Filestream could miss ingesting the last few lines of a file when the following happened: - The Harvester reaches EOF and stops on its backoff - The inactive check that runs on its own go routine marks the file as inactive and cancels the reader/harvester context. - The file watcher, that runs on its own goroutine, detects a change in file size and tries to start a new harvester. The file watcher updates its internal state of the file as its current size. - The harvester fails to start because there is one already running (the one blocked on backoff wait). - The backoff expires and the Harvester resumes running and exits right away. - The file watcher has a state (size) for the file that is different than what was actually ingested, so it does not try to start a new harvester until there is another change in the file. This makes Filebeat to miss the last few lines added to the file. This commit fixes this problem by making the harvester notify the file watcher when it stops and the amount of data is has read. During the scan the file watcher can replace its internal state by the harvester, allowing it to start a new harvester if necessary. --------- (cherry picked from commit 3fa1a5e) Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co> Co-authored-by: Orestis Floros <orestisflo@gmail.com> Co-authored-by: Emilio Alvarez Piñeiro <95703246+emilioalvap@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
belimawr
added a commit
that referenced
this pull request
Nov 18, 2025
mergify Bot
pushed a commit
that referenced
this pull request
Nov 19, 2025
Filestream could miss ingesting the last few lines of a file when the following happened: - The Harvester reaches EOF and stops on its backoff - The inactive check that runs on its own go routine marks the file as inactive and cancels the reader/harvester context. - The file watcher, that runs on its own goroutine, detects a change in file size and tries to start a new harvester. The file watcher updates its internal state of the file as its current size. - The harvester fails to start because there is one already running (the one blocked on backoff wait). - The backoff expires and the Harvester resumes running and exits right away. - The file watcher has a state (size) for the file that is different than what was actually ingested, so it does not try to start a new harvester until there is another change in the file. This makes Filebeat to miss the last few lines added to the file. This commit fixes this problem by making the harvester notify the file watcher when it stops and the amount of data is has read. During the scan the file watcher can replace its internal state by the harvester, allowing it to start a new harvester if necessary. --------- Co-authored-by: Orestis Floros <orestisflo@gmail.com> Co-authored-by: Emilio Alvarez Piñeiro <95703246+emilioalvap@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> (cherry picked from commit 3fa1a5e) # Conflicts: # filebeat/input/filestream/environment_test.go # filebeat/input/filestream/filestream.go # filebeat/input/filestream/fswatch.go # filebeat/input/filestream/fswatch_test.go # filebeat/input/filestream/input.go # filebeat/input/filestream/input_delete_integration_test.go # filebeat/input/filestream/internal/input-logfile/fswatch.go # filebeat/input/filestream/internal/input-logfile/harvester.go # filebeat/input/filestream/internal/input-logfile/harvester_test.go # filebeat/input/filestream/internal/input-logfile/manager.go # filebeat/input/filestream/internal/input-logfile/manager_test.go # filebeat/input/filestream/internal/input-logfile/store.go # filebeat/input/filestream/internal/input-logfile/store_test.go # filebeat/input/filestream/prospector_creator.go # filebeat/tests/integration/filestream_truncation_test.go # libbeat/tests/integration/datagenerator.go
9 tasks
andrzej-stencel
pushed a commit
to andrzej-stencel/beats
that referenced
this pull request
Dec 1, 2025
…7247) Filestream could miss ingesting the last few lines of a file when the following happened: - The Harvester reaches EOF and stops on its backoff - The inactive check that runs on its own go routine marks the file as inactive and cancels the reader/harvester context. - The file watcher, that runs on its own goroutine, detects a change in file size and tries to start a new harvester. The file watcher updates its internal state of the file as its current size. - The harvester fails to start because there is one already running (the one blocked on backoff wait). - The backoff expires and the Harvester resumes running and exits right away. - The file watcher has a state (size) for the file that is different than what was actually ingested, so it does not try to start a new harvester until there is another change in the file. This makes Filebeat to miss the last few lines added to the file. This commit fixes this problem by making the harvester notify the file watcher when it stops and the amount of data is has read. During the scan the file watcher can replace its internal state by the harvester, allowing it to start a new harvester if necessary. --------- Co-authored-by: Orestis Floros <orestisflo@gmail.com> Co-authored-by: Emilio Alvarez Piñeiro <95703246+emilioalvap@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
andrzej-stencel
pushed a commit
to andrzej-stencel/beats
that referenced
this pull request
Dec 1, 2025
pierrehilbert
pushed a commit
that referenced
this pull request
Dec 4, 2025
…es of a file (#47749) * [WIP] * Fix tests * Fix deadlock * Fix tests * Update Filestream default config function * Update notice.txt * Update tests to new testing framework version * Cleanup comments * Update/fix libbeat tests * Fix more tests * Fix more integration tests * Fix FIPS/ECH integration test
mergify Bot
pushed a commit
that referenced
this pull request
Dec 4, 2025
…es of a file (#47749) * [WIP] * Fix tests * Fix deadlock * Fix tests * Update Filestream default config function * Update notice.txt * Update tests to new testing framework version * Cleanup comments * Update/fix libbeat tests * Fix more tests * Fix more integration tests * Fix FIPS/ECH integration test (cherry picked from commit 36109da) # Conflicts: # filebeat/input/filestream/environment_test.go # filebeat/input/filestream/filestream_test.go # filebeat/input/filestream/input.go # filebeat/input/filestream/internal/input-logfile/manager.go # filebeat/input/filestream/internal/input-logfile/manager_test.go # filebeat/input/filestream/internal/input-logfile/store.go # filebeat/input/filestream/internal/input-logfile/store_test.go # filebeat/input/filestream/prospector_creator_test.go # filebeat/tests/integration/autodiscover_test.go # filebeat/tests/integration/filestream_test.go # go.mod # libbeat/tests/integration/dashboard_test.go # libbeat/tests/integration/framework.go
9 tasks
belimawr
added a commit
that referenced
this pull request
Dec 8, 2025
…Fix missing last few lines of a file (#47911) (cherry picked from commit 36109da) # Conflicts: # filebeat/input/filestream/environment_test.go # filebeat/input/filestream/filestream_test.go # filebeat/input/filestream/input.go # filebeat/input/filestream/internal/input-logfile/manager.go # filebeat/input/filestream/internal/input-logfile/manager_test.go # filebeat/input/filestream/internal/input-logfile/store.go # filebeat/input/filestream/internal/input-logfile/store_test.go # filebeat/input/filestream/prospector_creator_test.go # filebeat/tests/integration/autodiscover_test.go # filebeat/tests/integration/filestream_test.go # go.mod # libbeat/tests/integration/dashboard_test.go # libbeat/tests/integration/framework.go --------- Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co> Co-authored-by: Andrzej Stencel <andrzej.stencel@elastic.co>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Proposed commit message
Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration files./changelog/fragmentsusing the changelog tool.## Disruptive User ImpactAuthor's Checklist
How to test this PR locally
Run the tests
Manual test
Testing this fix manually is possible, but requires you to monitor the
logs and add data to the file being ingested at a specific time.
At a very high level, the steps are:
harvester
If you ran this test without the fix from this PR, after
#4Filestream will not try to start any more harvesters for the file,
effectively missing the last few lines.
The best way to manually test this PR is to have two terminals open,
one running Filebeat and another ready to append data to the file
Filebeat is ingesting.
Create a file with at least 1kb of data and write down its size
flog -n 20 > /tmp/flog.log wc -c /tmp/flog.logStart Filebeat with following config:
filebeat.yml
To make the logs easier to read, you can send the logs to stdout
and pipe them through jq:
Wait for the log entry:
'/tmp/flog.log' is inactiveAdd data to the file
flog -n 2 >> /tmp/flog.logWait for the log entry:
File /tmp/flog.log has been updatedWait for the log entry:
Harvester already runningWait for the log entry:
File is inactive. Closing. Path='/tmp/flog.log'Wait for the log entry:
Stopped harvester for fileWait for the log entry:
Updating previous state because harvester was closed. '/tmp/flog.log': xxx, wherexxxis the original file size.Wait for the log entry:
File /tmp/flog.log has been updatedWait for the log entry:
Starting harvester for fileWait for the log entry:
End of file reached: /tmp/flog.log; Backoff now.Ensure all events have been read:
wc -l output*.ndjson.Related issues
## Use cases## Screenshots## LogsBenchmarks
Go Benchmark
This is likely not very relevant to the final form of this PR, but I ran some benchmarks comparing the different strategies to prevent the race condition when accessing the
offsetandlastTimeReadin the harvester, below are the results and the codefilebeat/input/filestream/filestream_test.go
Benchbuilder
Latest release: v9.2.1
9.2.12m43.075351941s12264.000000175.31629.2.148.46343038s41269.000000183.11839.2.12m47.897040994s11912.000000176.51489.2.14m51.107096736s6870.000000178.5985PR version
9.3.02m41.103916351s12414.000000175.37349.3.047.520195331s42088.000000182.56259.3.02m44.102216849s12188.000000175.83899.3.04m56.598482898s6743.000000179.3721