Skip to content

Conversation

@baeminbo
Copy link
Contributor

@baeminbo baeminbo commented Aug 6, 2021

Input data for completed instructions (especially, failed instruction) can cause the thread of _GrpcDataChannel._read_inputs() and another thread of _GrpcDataChannel.input_elemnts() to be stuck.

The current implementation makes this issue in the following scenario.

  • Thread A is for gRPC data channel. It reads data elements and puts to queues in _GrpcDataChannel._read_inputs().
  • Thread B is a work thread for an instruction "process_bundle-10". A work thread runs user code and pulls data elements from the queue for the instruction in _GrpcDataChannel.input_elements() from BundleProcessor.process_bundle().
  • Thread C is another work thread for an instruction "process_bundle-12".
  1. (Thread A) Read the first chunk of data elements for an instruction "process_bundle-10". This adds a new queue to _received in _receiving_queue() from _read_inputs() and put elements to the queue.
  2. (Thread B) Pull the first element from the queue for "process_bundle-10" in input_elements() and throw an exception during processing the element. This remove the queue with _clean_receiving_queue in input_elements()'s finally clause.
  3. (Thread A) Read remaining chunks of elements for "process_bundle-10". It will add new queue for "process_bundle-10" to _received again! But, be blocked as queue full (maxsize = 5) indefinitely as there is no work thread pulling data elements of "process_bundle-10".
  4. (Thread C) Wait on data for another instruction "process_bundle-12" but be blocked at input_elements() because Thread A has no progress for "process_bundle-12" data.

As a solution, I suggest managing completed instructions in _GrpcDataChannel to avoid data elements queue to be restored so that gRPC data channel thread and work threads will have no blocking issue.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status Build Status Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status ---
XLang Build Status Build Status Build Status Build Status Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@baeminbo
Copy link
Contributor Author

baeminbo commented Aug 6, 2021

@robertwb I created a PR for easy review. I added a comment. Please let me know if single commit is preferred. I'll squash the commits into one.

@codecov
Copy link

codecov bot commented Aug 6, 2021

Codecov Report

Merging #15293 (b49b7d9) into master (22205ee) will increase coverage by 0.28%.
The diff coverage is 69.04%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #15293      +/-   ##
==========================================
+ Coverage   83.78%   84.06%   +0.28%     
==========================================
  Files         439      441       +2     
  Lines       59237    61147    +1910     
==========================================
+ Hits        49632    51406    +1774     
- Misses       9605     9741     +136     
Impacted Files Coverage Δ
...ks/python/apache_beam/runners/worker/data_plane.py 87.70% <69.04%> (-2.90%) ⬇️
sdks/python/apache_beam/transforms/sideinputs.py 92.15% <0.00%> (-1.73%) ⬇️
sdks/python/apache_beam/testing/test_stream.py 91.08% <0.00%> (-1.31%) ⬇️
sdks/python/apache_beam/runners/common.py 88.73% <0.00%> (-0.45%) ⬇️
sdks/python/apache_beam/coders/coders.py 87.80% <0.00%> (-0.23%) ⬇️
...ks/python/apache_beam/runners/worker/sdk_worker.py 88.85% <0.00%> (-0.16%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 93.51% <0.00%> (-0.10%) ⬇️
sdks/python/apache_beam/transforms/util.py 95.98% <0.00%> (-0.09%) ⬇️
sdks/python/apache_beam/io/gcp/bigquery_tools.py 87.41% <0.00%> (-0.08%) ⬇️
sdks/python/apache_beam/transforms/ptransform.py 93.54% <0.00%> (-0.06%) ⬇️
... and 35 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 22205ee...b49b7d9. Read the comment docs.

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This looks good.

@robertwb
Copy link
Contributor

For the lint/formatter, you can run yapf -irp sdks/python/apache_beam/runners/worker/data_plane.py

@baeminbo baeminbo closed this Aug 12, 2021
@baeminbo baeminbo deleted the grpc-data-channel-patch branch August 12, 2021 11:03
@baeminbo
Copy link
Contributor Author

Accidentally closed and reopen PR.

@baeminbo baeminbo reopened this Aug 12, 2021
@baeminbo baeminbo force-pushed the grpc-data-channel-patch branch from d3deb0a to b49b7d9 Compare August 12, 2021 11:08
@baeminbo
Copy link
Contributor Author

I fixed Python lint test failure but precheck still fails at Python PreCommit. I guess this failure is due to BEAM-12699, not by my change.

@robertwb Could you let me know what I should do to merge this change? thanks!

@baeminbo baeminbo changed the title [WIP] Fix grpc data read thread block with finished instruction_id in _GrpcDataChannel Fix grpc data read thread block with finished instruction_id in _GrpcDataChannel Aug 12, 2021
@robertwb
Copy link
Contributor

Run Python PreCommit

@robertwb
Copy link
Contributor

All looks good now. Thanks for your contribution!

@robertwb robertwb merged commit 216f0d9 into apache:master Aug 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants