Skip to content

Conversation

@aocsa
Copy link
Contributor

@aocsa aocsa commented Oct 15, 2021

The purpose of this task is to make an ExecNode that can provide the following functionality.

Be able to obtain heuristics about our memory consumption and have a memory consumption threshold
Be able to write incoming ExecBatch to disk if memory consumption is above the threshold, stores either the ExecBatch or a handle to file in a queue.
Provide an api for pulling an ExecBatch from the queue. It should favor pulling all of the batches that are in memory first and then the ones that are handles to files.

@github-actions
Copy link

@aocsa aocsa force-pushed the aocsa/ARROW-14330 branch from ff88388 to 1087843 Compare October 29, 2021 18:41
@aocsa aocsa marked this pull request as ready for review October 29, 2021 22:47
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like parts of this PR. I'm not sure the node's scheduling is going to work. I think the scheduling prioritization logic and the trigger to resume operation are missing. "Something" needs to be checking memory on a regular basis and, if it frees up, resuming work (prioritizing in-memory data before on-disk data according to the JIRA).

Comment on lines +38 to +77
std::shared_ptr<Schema> GenerateRandomSchema(size_t num_inputs) {
static std::vector<std::shared_ptr<DataType>> some_arrow_types = {
arrow::null(), arrow::boolean(), arrow::int8(), arrow::int16(),
arrow::int32(), arrow::int64(), arrow::float16(), arrow::float32(),
arrow::float64(), arrow::utf8(), arrow::binary(), arrow::date32()};

std::vector<std::shared_ptr<Field>> fields(num_inputs);
std::default_random_engine gen(42);
std::uniform_int_distribution<int> types_dist(
0, static_cast<int>(some_arrow_types.size()) - 1);
for (size_t i = 0; i < num_inputs; i++) {
int random_index = types_dist(gen);
auto col_type = some_arrow_types.at(random_index);
fields[i] =
field("column_" + std::to_string(i) + "_" + col_type->ToString(), col_type);
}
return schema(fields);
}

void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema,
size_t num_batches, BatchesWithSchema* out_batches,
int multiplicity = 1, int64_t batch_size = 4) {
if (num_batches == 0) {
auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
out_batches->batches.push_back(empty_record_batch);
} else {
for (size_t j = 0; j < num_batches; j++) {
out_batches->batches.push_back(
ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
}
}

size_t batch_count = out_batches->batches.size();
for (int repeat = 1; repeat < multiplicity; ++repeat) {
for (size_t i = 0; i < batch_count; ++i) {
out_batches->batches.push_back(out_batches->batches[i]);
}
}
out_batches->schema = schema;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is an interesting concept I don't see why we need random batches to test a data holder.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess intead of random we could just do comprehensive so that we cover all the types listed here every time.

::arrow::random::RandomArrayGenerator rng_;
};

TEST_F(TestDataHolderNode, TestNonEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will eventually need tests that cover the case where limits are hit and batches are actually persisted.

Comment on lines +75 to +86
std::string RandomString(std::size_t length) {
const std::string characters =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
std::random_device random_device;
std::mt19937 generator(random_device());
std::uniform_int_distribution<> distribution(0, characters.size() - 1);
std::string random_string;
for (std::size_t i = 0; i < length; ++i) {
random_string += characters[distribution(generator)];
}
return random_string;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably fits better in a utility class somewhere so it can be shared. It's pretty similar to the existing function in bloom_filter_test.cc

arrow::fs::FileSystemFromUri(cache_storage_root_path, &root_path).ValueOrDie();

file_path_ = root_path + file_name;
status_ = StoreRecordBatch(record_batch, filesystem_, file_path_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complex logic shouldn't happen in constructors. Maybe there needs to be a Put method or something.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are a few options here but what we are saying is that we have these different DataHolders that are created from record_batch. We could make a function which is like

template <typename DataHolderType> make_data_holder_from_batch(...)

And it would invoke the constructur with something like the filesystem and path pointing to a file that has already persisted the record batch. This would allow the constructor to be simpler while still being able to have a common api for creating the data_holders from a record_batch

return batch;
}

class DiskDataHolder : public DataHolder {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These DataHolder implementations might fit better elsewhere. I'm not sure we want the compute module to depend on the ipc module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering the same question. Maybe the best place is arrow/dataset/ like DatasetWritingSinkNodeConsumer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ARROW_DCHECK(executor_ != nullptr);
return executor_->Submit(this->stop_source_.token(), [this] {
auto generator = this->data_holder_manager_->generator();
auto iterator = MakeGeneratorIterator(std::move(generator));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use a condition variable that waits until there are things to iterate on. When a generator has something to push it can notify that cv.

data_holder_manager_ =
::arrow::internal::make_unique<DataHolderManager>(plan->exec_context());

auto status = task_group_.AddTask([this]() -> Result<Future<>> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is how we should push into the next node. We need for a way for a node to be able to request this data from the DataHolder. Setting up a task that basically loops through inputs to push them forward doesn't allow for intelligent scheduling of these things in the future.

Two options I see are either

  1. Have the nodes that follow DataHolderManager pull from the DataHolderManager. This is relatively straight forward but lacks control.
  2. Have some kind of Simple Scheduler which pulls from the DataHolderManager that have accumulated the most HeldData and pushes those tasks forward. This is a bit more work but provides more value in my opinion.

Comment on lines +38 to +77
std::shared_ptr<Schema> GenerateRandomSchema(size_t num_inputs) {
static std::vector<std::shared_ptr<DataType>> some_arrow_types = {
arrow::null(), arrow::boolean(), arrow::int8(), arrow::int16(),
arrow::int32(), arrow::int64(), arrow::float16(), arrow::float32(),
arrow::float64(), arrow::utf8(), arrow::binary(), arrow::date32()};

std::vector<std::shared_ptr<Field>> fields(num_inputs);
std::default_random_engine gen(42);
std::uniform_int_distribution<int> types_dist(
0, static_cast<int>(some_arrow_types.size()) - 1);
for (size_t i = 0; i < num_inputs; i++) {
int random_index = types_dist(gen);
auto col_type = some_arrow_types.at(random_index);
fields[i] =
field("column_" + std::to_string(i) + "_" + col_type->ToString(), col_type);
}
return schema(fields);
}

void GenerateBatchesFromSchema(const std::shared_ptr<Schema>& schema,
size_t num_batches, BatchesWithSchema* out_batches,
int multiplicity = 1, int64_t batch_size = 4) {
if (num_batches == 0) {
auto empty_record_batch = ExecBatch(*rng_.BatchOf(schema->fields(), 0));
out_batches->batches.push_back(empty_record_batch);
} else {
for (size_t j = 0; j < num_batches; j++) {
out_batches->batches.push_back(
ExecBatch(*rng_.BatchOf(schema->fields(), batch_size)));
}
}

size_t batch_count = out_batches->batches.size();
for (int repeat = 1; repeat < multiplicity; ++repeat) {
for (size_t i = 0; i < batch_count; ++i) {
out_batches->batches.push_back(out_batches->batches[i]);
}
}
out_batches->schema = schema;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess intead of random we could just do comprehensive so that we cover all the types listed here every time.

arrow::fs::FileSystemFromUri(cache_storage_root_path, &root_path).ValueOrDie();

file_path_ = root_path + file_name;
status_ = StoreRecordBatch(record_batch, filesystem_, file_path_);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are a few options here but what we are saying is that we have these different DataHolders that are created from record_batch. We could make a function which is like

template <typename DataHolderType> make_data_holder_from_batch(...)

And it would invoke the constructur with something like the filesystem and path pointing to a file that has already persisted the record batch. This would allow the constructor to be simpler while still being able to have a common api for creating the data_holders from a record_batch


struct ExecBatch;

enum class MemoryLevel : int { kGpuLevel, kCpuLevel, kDiskLevel, kNumLevels };

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want the memory levels to be more extensible than the ones that we staticlaly define? I think this would be fine for a first pass but we should think about later one when maybe the user wants to define their own memory levels.

@westonpace
Copy link
Member

I'm going to close this as I don't think it is being pursued any longer and we are implementing these features in other PRs.

@westonpace westonpace closed this Mar 29, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants