Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
3f24850
Added low-level file I/O spans
joosthooz Feb 13, 2023
d106c42
Added spans to datasetwriter for popstagedbatches and the actual IO w…
joosthooz Feb 13, 2023
8aba916
Added a span for writing parquet.
joosthooz Feb 13, 2023
f9da7ad
Removed ReadBatch spans because they were orphans and didn't represen…
joosthooz Feb 13, 2023
10a80d6
File I/O spans now have actual read bytes as property
joosthooz Feb 16, 2023
ab34c08
Update span name
joosthooz Feb 16, 2023
4e48581
Added spans to ipc compression functions because they are called by a…
joosthooz Feb 16, 2023
1a1307a
Added a span to a parallelFor lambda in parquet: GetRecordBatchReader…
joosthooz Feb 16, 2023
56b8535
Creating a single span for a projection instead of 1 for each expression
joosthooz Feb 16, 2023
56c52af
Added span type attributes to task submission spans.
joosthooz Feb 16, 2023
815d317
No longer creating a span when task submission is queued (throttled)
joosthooz Feb 16, 2023
481bbe4
Creating a string_view from the task name basic_string_view
joosthooz Feb 16, 2023
1f3adee
Added the size of the output to some spans.
joosthooz Feb 17, 2023
43a7fba
Added (presumably missing) START_SPAN to OrderBySinkNode
joosthooz Feb 17, 2023
af56a27
Removed unintended line
joosthooz Feb 27, 2023
24b5ff8
Do not create toplevel task span at task submission
joosthooz Feb 27, 2023
37e84e4
Removed arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next span in…
joosthooz Feb 28, 2023
ed397fc
Added the size of the batch outputted by CSV reader to the span.
joosthooz Feb 28, 2023
2f7fa97
Added span for processing the first block.
joosthooz Mar 1, 2023
38c4639
Creating explicit ProcessMorsel and DatasetScan spans
joosthooz Mar 1, 2023
8764d34
Added span for keeping track of re-chunking during scanning
joosthooz Mar 2, 2023
58c9db4
Reverting back to PeekAndMakeAsync,
joosthooz Mar 2, 2023
ee28b21
Posting datasetwriter backpressure events to the asyncscheduler span
joosthooz Mar 2, 2023
1163ef1
Added push and pop spans to the datasetwriter that allow tracking sta…
joosthooz Mar 2, 2023
07343d7
Formatting
joosthooz Mar 2, 2023
a3a126d
Fixed typo in span attribute
joosthooz Mar 17, 2023
e1a0a0f
Added a surrogate arrow::csv::ReadNextAsync span for the first block …
joosthooz Mar 17, 2023
6a9b635
WIP writing some more extensive documentation about tracing Acero
joosthooz Mar 17, 2023
ecd80c4
Merge remote-tracking branch 'origin/main' into GH-33880-improve-trac…
joosthooz Apr 6, 2023
a28a1db
Added helper function ATTRIBUTE_ON_CURRENT_SPAN
joosthooz Jun 2, 2023
a665bb6
Merge remote-tracking branch 'origin/main' into GH-33880-improve-tracing
joosthooz Jun 2, 2023
2129678
Reverted commenting out task submission tracing
joosthooz Jun 5, 2023
06bcc1e
Fix typo
joosthooz Jun 5, 2023
c8fa9f6
Using helper function in some more places
joosthooz Jun 5, 2023
d315264
Removed some unused code from the datasetwriter
joosthooz Jul 11, 2023
407bcb2
Propagating span context to SimpleTask submission functions
joosthooz Jul 11, 2023
ccdf2ec
Removed TODO, some typos
joosthooz Jul 12, 2023
45f03fa
Formatting
joosthooz Jul 12, 2023
4e0d497
Merge branch 'main' into GH-33880-improve-tracing
joosthooz Jul 13, 2023
42fc7b6
Added missing empty macro for ATTRIBUTE_ON_CURRENT_SPAN()
joosthooz Jul 13, 2023
b64ca22
Revert "Propagating span context to SimpleTask submission functions"
joosthooz Jul 11, 2023
898b093
Propagating parent span through task submission
joosthooz Jul 14, 2023
78ea497
Increased queue size for tracing to avoid dropping spans
joosthooz Jul 14, 2023
4028cd6
Removed task submission trace functions
joosthooz Jul 14, 2023
19522e5
Formatting
joosthooz Jul 14, 2023
b9df0be
Updated some namespace usages
joosthooz Jul 17, 2023
07d5012
Wrote some documentation about the structure of traces
joosthooz Jul 17, 2023
14874ca
Merge remote-tracking branch 'origin/main' into GH-33880-improve-tracing
joosthooz Oct 12, 2023
50e808c
Removed some of the more intrusive Otel spans
joosthooz Oct 12, 2023
fc9c659
Removed the low-level File I/O spans, there's just too many of them a…
joosthooz Oct 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions cpp/src/arrow/acero/filter_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ class FilterNode : public MapNode {
Result<ExecBatch> ProcessBatch(ExecBatch batch) override {
ARROW_ASSIGN_OR_RAISE(Expression simplified_filter,
SimplifyWithGuarantee(filter_, batch.guarantee));

arrow::util::tracing::Span span;
START_COMPUTE_SPAN(span, "Filter",
{{"filter.expression", ToStringExtra()},
{"filter.expression.simplified", simplified_filter.ToString()},
{"filter.length", batch.length}});
{"filter.length", batch.length},
{"input_batch.size_bytes", batch.TotalBufferSize()}});

ARROW_ASSIGN_OR_RAISE(
Datum mask, ExecuteScalarExpression(simplified_filter, batch,
Expand All @@ -87,8 +87,10 @@ class FilterNode : public MapNode {
if (mask.is_scalar()) {
const auto& mask_scalar = mask.scalar_as<BooleanScalar>();
if (mask_scalar.is_valid && mask_scalar.value) {
ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", batch.TotalBufferSize());
return batch;
}
ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", 0);
return batch.Slice(0, 0);
}

Expand All @@ -101,7 +103,10 @@ class FilterNode : public MapNode {
if (value.is_scalar()) continue;
ARROW_ASSIGN_OR_RAISE(value, Filter(value, mask, FilterOptions::Defaults()));
}
return ExecBatch::Make(std::move(values));
auto filtered_batch = ExecBatch::Make(std::move(values));
ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes",
filtered_batch->TotalBufferSize());
return filtered_batch;
}

protected:
Expand Down
13 changes: 8 additions & 5 deletions cpp/src/arrow/acero/project_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,22 @@ class ProjectNode : public MapNode {

Result<ExecBatch> ProcessBatch(ExecBatch batch) override {
std::vector<Datum> values{exprs_.size()};
arrow::util::tracing::Span span;
START_COMPUTE_SPAN(span, "Project",
{{"project.length", batch.length},
{"input_batch.size_bytes", batch.TotalBufferSize()}});
for (size_t i = 0; i < exprs_.size(); ++i) {
arrow::util::tracing::Span span;
START_COMPUTE_SPAN(span, "Project",
{{"project.type", exprs_[i].type()->ToString()},
{"project.length", batch.length},
{"project.expression", exprs_[i].ToString()}});
std::string project_name = "project[" + std::to_string(i) + "]";
ATTRIBUTE_ON_CURRENT_SPAN(project_name + ".type", exprs_[i].type()->ToString());
ATTRIBUTE_ON_CURRENT_SPAN(project_name + ".expression", exprs_[i].ToString());
ARROW_ASSIGN_OR_RAISE(Expression simplified_expr,
SimplifyWithGuarantee(exprs_[i], batch.guarantee));

ARROW_ASSIGN_OR_RAISE(
values[i], ExecuteScalarExpression(simplified_expr, batch,
plan()->query_context()->exec_context()));
}
ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", batch.TotalBufferSize());
return ExecBatch{std::move(values), batch.length};
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ struct OrderBySinkNode final : public SinkNode {

Status Finish() override {
arrow::util::tracing::Span span;
START_SPAN(span, std::string(kind_name()) + "::Finish");
ARROW_RETURN_NOT_OK(DoFinish());
return SinkNode::Finish();
}
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/arrow/acero/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ struct SourceNode : ExecNode, public TracedNode {
plan_->query_context()->ScheduleTask(
[this, morsel_length, use_legacy_batching, initial_batch_index, morsel,
has_ordering = !ordering_.is_unordered()]() {
arrow::util::tracing::Span span;
START_SPAN(span, "SourceNode::ProcessMorsel");
int64_t offset = 0;
int batch_index = initial_batch_index;
do {
Expand Down Expand Up @@ -163,6 +165,7 @@ struct SourceNode : ExecNode, public TracedNode {

Status StartProducing() override {
NoteStartProducing(ToStringExtra());

{
// If another exec node encountered an error during its StartProducing call
// it might have already called StopProducing on all of its inputs (including this
Expand All @@ -184,6 +187,9 @@ struct SourceNode : ExecNode, public TracedNode {
options.should_schedule = ShouldSchedule::IfDifferentExecutor;
ARROW_ASSIGN_OR_RAISE(Future<> scan_task, plan_->query_context()->BeginExternalTask(
"SourceNode::DatasetScan"));
arrow::util::tracing::Span span;
START_SPAN(span, "SourceNode::DatasetScan");

if (!scan_task.is_valid()) {
// Plan has already been aborted, no need to start scanning
return Status::OK();
Expand All @@ -195,9 +201,6 @@ struct SourceNode : ExecNode, public TracedNode {
}
lock.unlock();

arrow::util::tracing::Span fetch_batch_span;
auto fetch_batch_scope =
START_SCOPED_SPAN(fetch_batch_span, "SourceNode::ReadBatch");
return generator_().Then(
[this](
const std::optional<ExecBatch>& morsel_or_end) -> Future<ControlFlow<int>> {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ struct FunctionExecutorImpl : public FunctionExecutor {
}

Result<Datum> Execute(const std::vector<Datum>& args, int64_t passed_length) override {
util::tracing::Span span;
arrow::util::tracing::Span span;

auto func_kind = func.kind();
const auto& func_name = func.name();
Expand Down
14 changes: 13 additions & 1 deletion cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/byte_size.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/utf8_internal.h"
#include "arrow/util/vector.h"

Expand Down Expand Up @@ -881,6 +883,8 @@ class StreamingReaderImpl : public ReaderMixin,
}

Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
util::tracing::Span span;
START_SPAN(span, "arrow::csv::ReadNextAsync");
return record_batch_gen_();
}

Expand All @@ -892,6 +896,11 @@ class StreamingReaderImpl : public ReaderMixin,
return Status::Invalid("Empty CSV file");
}

// Create a arrow::csv::ReadNextAsync span so that grouping by that name does not
// ignore the work performed for this first block.
util::tracing::Span read_span;
auto scope = START_SCOPED_SPAN(read_span, "arrow::csv::ReadNextAsync");

std::shared_ptr<Buffer> after_header;
ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed,
ProcessHeader(first_buffer, &after_header));
Expand All @@ -911,9 +920,12 @@ class StreamingReaderImpl : public ReaderMixin,
auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op));

auto self = shared_from_this();
return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) {
auto init_finished = rb_gen().Then([self, rb_gen, max_readahead,
read_span = std::move(read_span)
](const DecodedBlock& first_block) {
return self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0);
});
return init_finished;
}

Future<> InitFromBlock(const DecodedBlock& block,
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/dataset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
#include "arrow/scalar.h"
#include "arrow/type.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/byte_size.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {
namespace dataset {
Expand Down Expand Up @@ -144,6 +146,12 @@ inline RecordBatchGenerator MakeChunkedBatchGenerator(RecordBatchGenerator gen,
[batch_size](const std::shared_ptr<RecordBatch>& batch)
-> ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>> {
const int64_t rows = batch->num_rows();
util::tracing::Span span;
START_SPAN(span, "MakeChunkedBatchGenerator",
{{"target_batch_size_rows", batch_size},
{"batch.size_rows", rows},
{"batch.size_bytes", util::TotalBufferSize(*batch)},
{"output_batches", rows / batch_size + (rows % batch_size != 0)}});
if (rows <= batch_size) {
return ::arrow::MakeVectorGenerator<std::shared_ptr<RecordBatch>>({batch});
}
Expand Down
41 changes: 34 additions & 7 deletions cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/util/byte_size.h"
#include "arrow/util/future.h"
#include "arrow/util/logging.h"
#include "arrow/util/map.h"
Expand Down Expand Up @@ -191,9 +192,13 @@ class DatasetWriterFileQueue {
}

Result<int64_t> PopAndDeliverStagedBatch() {
util::tracing::Span span;
START_SPAN(span, "DatasetWriter::Pop");
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> next_batch, PopStagedBatch());
int64_t rows_popped = next_batch->num_rows();
rows_currently_staged_ -= next_batch->num_rows();
ATTRIBUTE_ON_CURRENT_SPAN("batch.size_rows", next_batch->num_rows());
ATTRIBUTE_ON_CURRENT_SPAN("rows_currently_staged", rows_currently_staged_);
ScheduleBatch(std::move(next_batch));
return rows_popped;
}
Expand All @@ -202,7 +207,15 @@ class DatasetWriterFileQueue {
Status Push(std::shared_ptr<RecordBatch> batch) {
uint64_t delta_staged = batch->num_rows();
rows_currently_staged_ += delta_staged;
staged_batches_.push_back(std::move(batch));
{
util::tracing::Span span;
START_SPAN(span, "DatasetWriter::Push",
{{"batch.size_rows", batch->num_rows()},
{"rows_currently_staged", rows_currently_staged_},
{"options_.min_rows_per_group", options_.min_rows_per_group},
{"max_rows_staged", writer_state_->max_rows_staged}});
staged_batches_.push_back(std::move(batch));
}
while (!staged_batches_.empty() &&
(writer_state_->StagingFull() ||
rows_currently_staged_ >= options_.min_rows_per_group)) {
Expand Down Expand Up @@ -233,6 +246,18 @@ class DatasetWriterFileQueue {
return DeferNotOk(options_.filesystem->io_context().executor()->Submit(
[self = this, batch = std::move(next)]() {
int64_t rows_to_release = batch->num_rows();
#ifdef ARROW_WITH_OPENTELEMETRY
uint64_t size_bytes = util::TotalBufferSize(*batch);
uint64_t num_buffers = 0;
for (auto column : batch->columns()) {
num_buffers += column->data()->buffers.size();
}
util::tracing::Span span;
START_SPAN(span, "DatasetWriter::WriteNext",
{{"threadpool", "IO"},
{"batch.size_bytes", size_bytes},
{"batch.num_buffers", num_buffers}});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do I need to know num_buffers?

#endif
Status status = self->writer_->Write(batch);
self->writer_state_->rows_in_flight_throttle.Release(rows_to_release);
return status;
Expand Down Expand Up @@ -261,11 +286,6 @@ class DatasetWriterFileQueue {
util::AsyncTaskScheduler* file_tasks_ = nullptr;
};

struct WriteTask {
std::string filename;
uint64_t num_rows;
};

class DatasetWriterDirectoryQueue {
public:
DatasetWriterDirectoryQueue(util::AsyncTaskScheduler* scheduler, std::string directory,
Expand Down Expand Up @@ -301,7 +321,6 @@ class DatasetWriterDirectoryQueue {

Status StartWrite(const std::shared_ptr<RecordBatch>& batch) {
rows_written_ += batch->num_rows();
WriteTask task{current_filename_, static_cast<uint64_t>(batch->num_rows())};
if (!latest_open_file_) {
ARROW_RETURN_NOT_OK(OpenFileQueue(current_filename_));
}
Expand Down Expand Up @@ -351,6 +370,8 @@ class DatasetWriterDirectoryQueue {
latest_open_file_tasks_ = util::MakeThrottledAsyncTaskGroup(
scheduler_, 1, /*queue=*/nullptr, std::move(file_finish_task));
if (init_future_.is_valid()) {
util::tracing::Span span;
START_SPAN(span, "arrow::dataset::WaitForDirectoryInit");
latest_open_file_tasks_->AddSimpleTask(
[init_future = init_future_]() { return init_future; },
"DatasetWriter::WaitForDirectoryInit"sv);
Expand All @@ -362,6 +383,8 @@ class DatasetWriterDirectoryQueue {
uint64_t rows_written() const { return rows_written_; }

void PrepareDirectory() {
util::tracing::Span span;
START_SPAN(span, "arrow::dataset::SubmitPrepareDirectoryTask");
if (directory_.empty() || !write_options_.create_dir) {
return;
}
Expand All @@ -383,6 +406,8 @@ class DatasetWriterDirectoryQueue {
if (write_options_.existing_data_behavior ==
ExistingDataBehavior::kDeleteMatchingPartitions) {
init_task = [this, create_dir_cb, notify_waiters_cb, notify_waiters_on_err_cb] {
util::tracing::Span span;
START_SPAN(span, "arrow::dataset::PrepareDirectory");
return write_options_.filesystem
->DeleteDirContentsAsync(directory_,
/*missing_dir_ok=*/true)
Expand Down Expand Up @@ -614,12 +639,14 @@ class DatasetWriter::DatasetWriterImpl {
backpressure =
writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows());
if (!backpressure.is_finished()) {
EVENT(scheduler_->span(), "DatasetWriter::Backpressure::TooManyRowsQueued");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why both EVENT and EVENT_ON_CURRENT_SPAN?

EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued");
break;
}
if (will_open_file) {
backpressure = writer_state_.open_files_throttle.Acquire(1);
if (!backpressure.is_finished()) {
EVENT(scheduler_->span(), "DatasetWriter::Backpressure::TooManyOpenFiles");
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
RETURN_NOT_OK(TryCloseLargestFile());
break;
Expand Down
21 changes: 4 additions & 17 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,6 @@ static inline Result<csv::ReadOptions> GetReadOptions(
static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(
auto fragment_scan_options,
GetFragmentScanOptions<CsvFragmentScanOptions>(
Expand All @@ -300,31 +296,24 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
// input->Peek call blocks so we run the whole thing on the I/O thread pool.
auto reader_fut = DeferNotOk(input->io_context().executor()->Submit(
[=]() -> Future<std::shared_ptr<csv::StreamingReader>> {

ARROW_ASSIGN_OR_RAISE(auto first_block, input->Peek(reader_options.block_size));
const auto& parse_options = format.parse_options;
ARROW_ASSIGN_OR_RAISE(
auto convert_options,
GetConvertOptions(format, scan_options ? scan_options.get() : nullptr,
first_block));
return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input),
cpu_executor, reader_options,
parse_options, convert_options);
return csv::StreamingReader::MakeAsync(
io::default_io_context(), std::move(input), cpu_executor, reader_options,
parse_options, convert_options);
}));
return reader_fut.Then(
// Adds the filename to the error
[=](const std::shared_ptr<csv::StreamingReader>& reader)
-> Result<std::shared_ptr<csv::StreamingReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
span->End();
#endif
return reader;
},
[=](const Status& err) -> Result<std::shared_ptr<csv::StreamingReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
arrow::internal::tracing::MarkSpan(err, span.get());
span->End();
#endif
return err.WithMessage("Could not open CSV input source '", path, "': ", err);
});
}
Expand Down Expand Up @@ -384,8 +373,6 @@ Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
auto reader_fut =
OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool());
auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(
generator, "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next");
return generator;
}

Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "arrow/util/parallel.h"
#include "arrow/util/string.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/ubsan.h"
#include "arrow/util/vector.h"
#include "arrow/visit_type_inline.h"
Expand Down Expand Up @@ -523,6 +524,12 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op

return ::arrow::internal::OptionalParallelFor(
options.use_threads, static_cast<int>(buffers.size()), [&](int i) {
util::tracing::Span span;
START_SPAN(span, "arrow::ipc::DecompressBuffer",
{{"buffer_index", i},
{"ipc.compression.codec", codec.get()->name().c_str()},
{"ipc.options.use_threads", options.use_threads},
{"size.uncompressed", (*buffers[i])->size() - sizeof(int64_t)}});
ARROW_ASSIGN_OR_RAISE(*buffers[i],
DecompressBuffer(*buffers[i], options, codec.get()));
return Status::OK();
Expand Down
Loading