-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
Describe the bug, including details regarding any error messages, version, and platform.
Originally the InputState was created with...
static Result<std::unique_ptr<InputState>> Make(
size_t index, TolType tolerance, bool must_hash, bool may_rehash,
KeyHasher* key_hasher, ExecNode* node, ExecNode* output,
std::atomic<int32_t>& backpressure_counter,
const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index) {
constexpr size_t low_threshold = 4, high_threshold = 8;
std::unique_ptr<BackpressureControl> backpressure_control =
std::make_unique<BackpressureController>(node, output, backpressure_counter);
...
InputState::Make(i, tolerance_, must_hash_, may_rehash_, key_hashers_[i].get(),
inputs[i], this, backpressure_counter_,
inputs[i]->output_schema(), indices_of_on_key_[i],
indices_of_by_key_[i]));
The BackpressureController is given node (which is inputs[i]) and output (which is this).
After dcdeab7 the code changed to:
static Result<std::unique_ptr<InputState>> Make(
size_t index, TolType tolerance, bool must_hash, bool may_rehash,
KeyHasher* key_hasher, AsofJoinNode* node, ExecNode* output,
std::atomic<int32_t>& backpressure_counter,
const std::shared_ptr<arrow::Schema>& schema, const col_index_t time_col_index,
const std::vector<col_index_t>& key_col_index) {
constexpr size_t low_threshold = 4, high_threshold = 8;
std::unique_ptr<BackpressureControl> backpressure_control =
std::make_unique<BackpressureController>(node, output, backpressure_counter);
...
InputState::Make(i, tolerance_, must_hash_, may_rehash_, key_hashers_[i].get(),
this, inputs[i], backpressure_counter_,
inputs[i]->output_schema(), indices_of_on_key_[i],
indices_of_by_key_[i]));
Note that BackpressureControl is now created with node (which is this) and then output (which is inputs[i]). This means, when the asof join node decides to pause it will call PauseProducing on AsofJoinNode which currently does nothing:
void PauseProducing(ExecNode* output, int32_t counter) override {}
void ResumeProducing(ExecNode* output, int32_t counter) override {}
Component(s)
C++