Skip to content

Conversation

@rtpsw
Copy link
Contributor

@rtpsw rtpsw commented Mar 1, 2023

See #34391

Note that the TestBasic7Forward test-case included in the PR reproduces the hang in the pre-PR code.

@rtpsw rtpsw requested a review from westonpace as a code owner March 1, 2023 08:29
@rtpsw rtpsw marked this pull request as draft March 1, 2023 08:29
@github-actions
Copy link

github-actions bot commented Mar 1, 2023

@github-actions
Copy link

github-actions bot commented Mar 1, 2023

⚠️ GitHub issue #34391 has been automatically assigned in GitHub to PR creator.

@rtpsw
Copy link
Contributor Author

rtpsw commented Mar 1, 2023

Marked as a draft because the current version appears to infrequently hit a race condition. For example:

Unequal at absolute position 2

@@ -0, +0 @@
-null
@@ -2, +1 @@
+13
Expected:
  [
    13,
    13
  ]
Actual:
  [
    null,
    13
  ]
Google Test trace:
/mnt/user1/tscontract/github/rtpsw/arrow2/cpp/src/arrow/compute/exec/asof_join_node_test.cc:587: Right-1 type: double
/mnt/user1/tscontract/github/rtpsw/arrow2/cpp/src/arrow/compute/exec/asof_join_node_test.cc:585: Right-0 type: uint64
/mnt/user1/tscontract/github/rtpsw/arrow2/cpp/src/arrow/compute/exec/asof_join_node_test.cc:583: Left type: time64[us]
/mnt/user1/tscontract/github/rtpsw/arrow2/cpp/src/arrow/compute/exec/asof_join_node_test.cc:581: Key type: int32
/mnt/user1/tscontract/github/rtpsw/arrow2/cpp/src/arrow/compute/exec/asof_join_node_test.cc:579: Time type: timestamp[us, tz=UTC]
/mnt/user1/tscontract/github/rtpsw/arrow2/cpp/src/arrow/compute/exec/asof_join_node_test.cc:577: Iteration: 63
/mnt/user1/tscontract/github/rtpsw/arrow2/cpp/src/arrow/compute/exec/asof_join_node_test.cc:567: Types seed: 1677489070467191057
/mnt/user1/tscontract/github/rtpsw/arrow2/cpp/src/arrow/compute/exec/asof_join_node_test.cc:968: AsofJoinBasicTest_TestEmpty1Forward_MutateNullKey
/mnt/user1/tscontract/github/rtpsw/arrow2/cpp/src/arrow/compute/exec/asof_join_node_test.cc:967: AsofJoinBasicTest_TestEmpty1Forward_MutateNullKey

occurred at iteration 63 (which can change, even when the seed is fixed) only of the above test-case, after several runs of the tester have passed.

@rtpsw
Copy link
Contributor Author

rtpsw commented Mar 1, 2023

cc @westonpace @icexelloss, in case you have an idea about the apparent race condition.

@rtpsw
Copy link
Contributor Author

rtpsw commented Apr 9, 2023

Coming back to this, I see the same race condition after merging main.

One experiment I made is related to this comment:

  • When I add a lock over mutex_ to UnsyncFront, I get the same results, including the race condition. This means that UnsyncFront is called safely per the comment, i.e., not called concurrently with pop or try_pop.
  • When I add a lock over mutex_ to UnsyncSize, I see a deadlock between one thread calling AsofJoinNode::ProcessThread -> ConcurrentQueue::Pop and another calling InputState::Push -> BackpressureConcurrentQueue::Push -> BackpressureConcurrentQueue::DoHandle -> ConcurrentQueue::UnsyncSize. This isn't surprising, just shows that the comment does not apply to UnsyncSize.

@rtpsw
Copy link
Contributor Author

rtpsw commented Apr 10, 2023

I fixed the race condition, at least locally, as I now see this macOS CI failure.

In a debug session, I observed that GetCurrentTime() for one of the RHS tables was returning 0 when it was expected to be ahead, leading to the tolerance not accepting this 0 time in CompositeReferenceTable::Emplace and to keeping a null batch, which checked up as a null value in the materialized column and ended up in the output. This scenario occurred only in a future-as-of-join, where GetCurrentTime() returns memo_.current_time_. Following up on this, I noticed that MemoStore was updating current_time_ only upon entry-removal. Apparently, the (not-so-controlled) timing of this entry-removal can cause the race condition. I fixed MemoStore to update the current time upon entry-store, and simplified its updating upon entry-removal.

Following this fix, I did not locally observe the race condition in several runs of the tester, whereas before I observed it after one or two runs. There is still the macOS failure, however, I don't have access to a macOS to locally test on it.

@rtpsw
Copy link
Contributor Author

rtpsw commented Apr 15, 2023

For reference, here are the relevant log lines from the macOS CI job failure:

[ RUN      ] AsofJoinNodeTest/AsofJoinBasicTest.TestBasic7Forward/1
/Users/runner/work/arrow/arrow/cpp/src/arrow/testing/gtest_util.cc:486: Failure
Failed
Unequal at absolute position 3

@@ -2, +2 @@
-null
+00:00:00.000012
Expected:
  [
    00:00:00.000010,
    null,
    00:00:00.000012
  ]
Actual:
  [
    00:00:00.000010,
    null,
    null
  ]
Google Test trace:
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:591: Right-1 type: timestamp[s, tz=UTC]
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:589: Right-0 type: time64[us]
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:587: Left type: int8
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:585: Key type: time64[ns]
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:583: Time type: time32[ms]
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:581: Iteration: 1
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:572: Types seed: 1681127979841385
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:922: AsofJoinBasicTest_TestBasic7_DoubleByKey
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:926: AsofJoinBasicTest_TestBasic7Forward_DoubleByKey
/Users/runner/work/arrow/arrow/cpp/src/arrow/testing/gtest_util.cc:486: Failure
Failed
Unequal at absolute position 3

@@ -1, +1 @@
-1970-01-01 00:00:00.000011
+null
Expected:
  [
    1970-01-01 00:00:00.000010,
    null,
    1970-01-01 00:00:00.000012
  ]
Actual:
  [
    1970-01-01 00:00:00.000010,
    1970-01-01 00:00:00.000011,
    1970-01-01 00:00:00.000012
  ]
Google Test trace:
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:591: Right-1 type: time64[us]
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:589: Right-0 type: timestamp[us, tz=UTC]
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:587: Left type: time64[us]
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:585: Key type: int32
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:583: Time type: timestamp[ms, tz=UTC]
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:581: Iteration: 9
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:572: Types seed: 1681127979841385
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:922: AsofJoinBasicTest_TestBasic7_DoubleByKey
/Users/runner/work/arrow/arrow/cpp/src/arrow/acero/asof_join_node_test.cc:926: AsofJoinBasicTest_TestBasic7Forward_DoubleByKey
[  FAILED  ] AsofJoinNodeTest/AsofJoinBasicTest.TestBasic7Forward/1, where GetParam() = AsofJoinBasicParams: DoubleByKey (202 ms)

@rtpsw
Copy link
Contributor Author

rtpsw commented Apr 15, 2023

The recent commit allowed the macOS CI job that previously failed to succeed this time (the single-job failure this time is irrelevant). Of course, this does not prove the as-of-join code is now free of race conditions, yet the explanation below may help in reasoning about what's going on.

The main idea in this recent fix is to ensure MemoStore has a valid current time when GetCurrentTime() is called, since in a future-as-of-join GetCurrentTime() returns memo_.current_time_. The initial current time of 0, which is set by the MemoStore constructor, is invalid; it was observed in the debug session described above and led to a null-value in the output. The fix includes the following:

  1. Changing MemoStore::current_time_ to be an atomic number.
  2. Adding MemoStore::UpdateTime, which updates MemoStore::current_time_ in a multi-threaded-safe way to the given time if the latter is greater.
  3. Invoking MemoStore::UpdateTime with the time of the first row of a received batch. This happens-before (in the multi-threading sense of the term) the batch is pushed to the queue, and therefore ensures that MemoStore::current_time_ is valid by the time GetLatestTime() is called when the batch is processed out of the queue.

Note that I believe the as-of-join node's process-thread is not responsible for the race condition. It just processes in the same order of batches received, regardless of this being done in a separate thread. I believe the race condition is due to the non-deterministic order of arrival of batches to the as-of-join node, and that there's an order of arrival that drives the code (before the recent fix) to access an invalid MemoStore::current_time_. If so, this order could be found and then simulated in a test-case, though I think this task can be left to later.

@rtpsw rtpsw marked this pull request as ready for review April 15, 2023 13:52
@rtpsw
Copy link
Contributor Author

rtpsw commented Apr 15, 2023

cc @westonpace, @icexelloss

@icexelloss
Copy link
Contributor

Thanks @rtpsw I don't have to look this closely Today but I will try to take a look soon

@icexelloss
Copy link
Contributor

icexelloss commented Apr 17, 2023

A quick look and a couple of questions:

The initial current time of 0, which is set by the MemoStore constructor, is invalid;

Why would the code later unable to detect that the current time is invalid? (I assume this is invalid because it hasn't got any input?)

I believe the race condition is due to the non-deterministic order of arrival of batches to the as-of-join node,

This is surprising - why would this happen in serial execution? And what evidence makes you believe this is happening?

@rtpsw
Copy link
Contributor Author

rtpsw commented Apr 17, 2023

Why would the code later unable to detect that the current time is invalid? (I assume this is invalid because it hasn't got any input?)

Detecting is not good enough. GetCurrentTime() needs a valid time when called, so there needs to be code that ensures this. The MemoStore constructor can't do this because at that time there is no input yet.

This is surprising - why would this happen in serial execution? And what evidence makes you believe this is happening?

I don't (yet?) have evidence to directly support this. What I noted is evidence against the race condition being due to the process thread, and so I suspect the order of input batches. I suspect that when a node has at least two inputs, the order of batches to that node (i.e., across its inputs) may still be non-deterministic even with serial execution. Perhaps @westonpace can shed some light here, or we could investigate.

@icexelloss
Copy link
Contributor

icexelloss commented Apr 17, 2023

I suspect that when a node has at least two inputs, the order of batches to that node (i.e., across its inputs) may still be non-deterministic even with serial execution.

Oh I see. Yeah I can see that happening - there is no guarantees of total ordering if there are multiple source. There is only guarantees of ordering of batches within a single source table.

@icexelloss
Copy link
Contributor

icexelloss commented Apr 17, 2023

Detecting is not good enough. GetCurrentTime() needs a valid time when called, so there needs to be code that ensures this. The MemoStore constructor can't do this because at that time there is no input yet.

If the GetCurrentTime() returns something that the caller can detect as invalid (e.g., negative value), then the caller can treat it as if there is no data and skip the current processing?

@rtpsw
Copy link
Contributor Author

rtpsw commented Apr 17, 2023

If the GetCurrentTime() returns something that the caller can detect as invalid (e.g., negative value), then the caller can treat it as if there is no data and skip the current processing?

That might be doable, but I doubt it would be simple. The calls to GetCurrentTime are in a deep enough stack that would be annoying to unwind while carrying some status indicating the invalid time condition.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor suggestions.


void UpdateTime(OnType ts) {
OnType prev_time = current_time_;
while (prev_time < ts && current_time_.compare_exchange_weak(prev_time, ts)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a while loop here instead of a single call to compare_exchange_strong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because compare_and_exchange_weak (see doc) may not find the expected value of prev_time from line 260 by the time the loop is executed due to a race condition (expected to be rare). In this case false is returned and prev_time is updated to the value of current_time_. Then, another iteration is tried. This is normal CAS-loop logic.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting review Awaiting review labels Apr 18, 2023
@westonpace
Copy link
Member

I agree that order between sources is not guaranteed. In other words, the first batch might be Source1Batch1 or it might be Source2Batch1 but it will not be Source1Batch2.

Another potential source of contention / race conditions is the interplay between InputReceived and the process thread. Normally InputReceived runs first and then the process thread runs afterwards. However, if the process thread is currently processing, it might run at the same time as InputReceived and there could be potential race conditions on the state. Though I'm pretty sure you're aware of this already :)

@rtpsw
Copy link
Contributor Author

rtpsw commented Apr 18, 2023

Another potential source of contention / race conditions is the interplay between InputReceived and the process thread. Normally InputReceived runs first and then the process thread runs afterwards. However, if the process thread is currently processing, it might run at the same time as InputReceived and there could be potential race conditions on the state. Though I'm pretty sure you're aware of this already :)

Indeed I'm aware. This scenario doesn't cause trouble because the InputReceived method basically just pushes a batch to the queue.

@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Apr 18, 2023
@rtpsw
Copy link
Contributor Author

rtpsw commented Apr 18, 2023

In the recent commit, I also included another invocation of UpdateTime, which now returns bool.

@rtpsw
Copy link
Contributor Author

rtpsw commented Apr 18, 2023

I see a failure on macOS and Windows with the same symptom as seen before for the race condition. Need to debug on at least one of these platforms.

@wjones127 wjones127 self-requested a review April 18, 2023 15:39
Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @rtpsw ! I can't vouch for the correctness but this PR looks formally fine to me.

@pitrou pitrou added this to the 12.0.1 milestone May 24, 2023
@pitrou pitrou merged commit dcdeab7 into apache:main May 24, 2023
@ursabot
Copy link

ursabot commented May 30, 2023

Benchmark runs are scheduled for baseline = f45a9e5 and contender = dcdeab7. dcdeab7 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️0.24% ⬆️0.0%] test-mac-arm
[Failed ⬇️8.5% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.27% ⬆️0.09%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] dcdeab76 ec2-t3-xlarge-us-east-2
[Finished] dcdeab76 test-mac-arm
[Failed] dcdeab76 ursa-i9-9960x
[Finished] dcdeab76 ursa-thinkcentre-m75q
[Finished] f45a9e57 ec2-t3-xlarge-us-east-2
[Finished] f45a9e57 test-mac-arm
[Finished] f45a9e57 ursa-i9-9960x
[Finished] f45a9e57 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ursabot
Copy link

ursabot commented May 31, 2023

['Python', 'R'] benchmarks have high level of regressions.
ursa-i9-9960x

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels May 31, 2023
Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rtpsw Unfortunately this is merged before I had the chance to properly review this. Can you take a look at the comments and create a follow up PR if needed,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add comments in the code to explain this. Looks like something could tricky to understand just from the code.

// when entries with a time less than T are removed, the current time is updated to the
// time of the next (by-time) and now-current entry or to T if no such entry exists.
OnType current_time_;
std::atomic<OnType> current_time_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why do we change this to atomic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to your other question here. There, we need to set the current time on the MemoStore instance given a new batch, and this is done from a different thread (the one handling an incoming input batch) than the one processing the batch (the one running ProcessThread) using the same MemoStore instance. This means we need to synchronize MemoStore.current_time_ between these threads, and so it is made atomic here.

if (rb->num_rows() > 0) {
queue_.Push(rb);
key_hasher_->Invalidate(); // batch changed - invalidate key hasher's cache
memo_.UpdateTime(GetTime(rb.get(), 0)); // time changed - update in MemoStore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why do we add the UpdateTime here. How was memostore time updated before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this post. UpdateTime is synchronized, and before we didn't need this synchronization.

bool has_entry = opt_entry.has_value();
OnType entry_time = has_entry ? (*opt_entry)->time : TolType::kMinValue;
row_index_t entry_row = has_entry ? (*opt_entry)->row : 0;
bool accepted = has_entry && tolerance.Accepts(lhs_latest_time, entry_time);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "accepted" mean here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, accepted is in the same sense of tolerance.Accepts, meaning that the entry's time must (exist and) be within the time interval defined by the tolerance for lhs_latest_time.

@rtpsw
Copy link
Contributor Author

rtpsw commented May 31, 2023

@rtpsw Unfortunately this is merged before I had the chance to properly review this. Can you take a look at the comments and create a follow up PR if needed,

Sorry about that. Would adding comments as part of #35838 work? I intend to handle this issue soon.

@pitrou
Copy link
Member

pitrou commented May 31, 2023

Sorry for merging early. If @icexelloss 's comments can be addressed as part of a standalone PR, to ease reviewing, it would be nice.

@rtpsw
Copy link
Contributor Author

rtpsw commented May 31, 2023

Can you please add comments in the code to explain this. Looks like something could tricky to understand just from the code.

Will do. In the meantime, I'll note that we do not remove entries earlier than a time ts when it is not in the past of latest_time, which is the case when latest_time >= ts is false; otherwise, we would be removing entries before processing them.

@icexelloss
Copy link
Contributor

icexelloss commented May 31, 2023 via email

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works but I find it a little confusing. It seems strange to me that the delaying node would be placed after the asof join node. I was thinking it would be placed on the slow input to ensure that it did not deliver any batches.

However, if you were to do that, the entire plan would hang. This is because the delaying node synchronously blocks. This means the CPU thread is taken out of use. When use_threads=false (as it is when we use the asof join node) then that means the only working thread is blocked.

The reason it works today is because the delay happens after the asof join node and so it is actually blocking the processing thread and not a CPU thread. It was surprising to me that the processing thread was the thread that called InputReceived. This means, if you have something like...

Source -> AsofJoinNode -> Project -> Aggregation

Then the processing thread will be the one calling Project and Aggregation which is odd. However, this issue isn't very relevant to the backpressure problem at hand.

So it seems this works because there is enough data initially delivered to release at least one batch. This batch gets caught in the delayed node which hangs the processing thread. Since the processing thread is hung then enough data accumulates in the inputs that it will eventually pause the inputs.

If we want to proceed with this design then I think that is ok. For the sake of completeness I am including an example of what I had in my mind. A "GatedNode" would not block but it would not emit any batches (they would just queue up in the scheduler) until the gate is unlocked.

So, with this idea, the gated node would be placed over the slow source. You could get rid of the entire idea of "delayed" sources (and all the accompanying sleeps). The plan would be started and we wait to ensure backpressure is hit. Once backpressure is hit we release the gate and then confirm that the plan resumes and finishes.

Feel free to take my idea or continue with this one.

@westonpace
Copy link
Member

Ignore that, put this review on the wrong PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[C++] Future as-of-join-node hangs on distant times

6 participants