From 3f248500bed86254ef55958f97ae4d502df82d19 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 13 Feb 2023 19:25:54 +0100 Subject: [PATCH 01/46] Added low-level file I/O spans --- cpp/src/arrow/util/io_util.cc | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 742865424d1..ff840aa76c9 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -32,6 +32,7 @@ #endif #include "arrow/util/windows_compatibility.h" // IWYU pragma: keep +#include "arrow/util/tracing_internal.h" #include #include @@ -1626,6 +1627,13 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { HANDLE handle = reinterpret_cast(_get_osfhandle(fd)); #endif int64_t total_bytes_read = 0; + ::arrow::util::tracing::Span span; + START_SPAN(span, "FileRead", + { + {"nbytes", nbytes}, + {"fd", fd} + }); + while (total_bytes_read < nbytes) { const int64_t chunksize = @@ -1664,6 +1672,12 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes) { int64_t bytes_read = 0; + ::arrow::util::tracing::Span span; + START_SPAN(span, "FileReadAt", + { + {"nbytes", nbytes}, + {"fd", fd} + }); while (bytes_read < nbytes) { int64_t chunksize = @@ -1690,6 +1704,12 @@ Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nb Status FileWrite(int fd, const uint8_t* buffer, const int64_t nbytes) { int64_t bytes_written = 0; + ::arrow::util::tracing::Span span; + START_SPAN(span, "FileWrite", + { + {"nbytes", nbytes}, + {"fd", fd} + }); while (bytes_written < nbytes) { const int64_t chunksize = From d106c4201143353a4dfc9eee71c6004a8f817121 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 13 Feb 2023 19:26:40 +0100 Subject: [PATCH 02/46] Added spans to datasetwriter for popstagedbatches and the actual IO write task --- cpp/src/arrow/dataset/dataset_writer.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index b1ebb660d6e..d33cbbe5875 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -155,6 +155,8 @@ class DatasetWriterFileQueue { Result> PopStagedBatch() { std::vector> batches_to_write; uint64_t num_rows = 0; + util::tracing::Span span; + START_SPAN(span, "DatasetWriter::PopStagedBatch"); while (!staged_batches_.empty()) { std::shared_ptr next = std::move(staged_batches_.front()); staged_batches_.pop_front(); @@ -232,6 +234,8 @@ class DatasetWriterFileQueue { return DeferNotOk(options_.filesystem->io_context().executor()->Submit( [self = this, batch = std::move(next)]() { int64_t rows_to_release = batch->num_rows(); + util::tracing::Span span; + START_SPAN(span, "DatasetWriter::WriteNext", {{"threadpool", "IO"}}); Status status = self->writer_->Write(batch); self->writer_state_->rows_in_flight_throttle.Release(rows_to_release); return status; From 8aba9161715f8412de6a0e437df4557b52a825a7 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 13 Feb 2023 19:27:23 +0100 Subject: [PATCH 03/46] Added a span for writing parquet. This is a rather high-level span for Parquet, there could be many underneath to show individual parts like encoding and compression. --- cpp/src/parquet/arrow/writer.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 6d22f318f6b..331153731de 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -16,6 +16,7 @@ // under the License. #include "parquet/arrow/writer.h" +#include "arrow/util/tracing_internal.h" #include #include @@ -130,6 +131,8 @@ class ArrowColumnWriterV2 { // // Columns are written in DFS order. Status Write(ArrowWriteContext* ctx) { + ::arrow::util::tracing::Span span; + START_SPAN(span, "ArrowColumnWriterV2::Write", {{"threadpool", "IO"}}); for (int leaf_idx = 0; leaf_idx < leaf_count_; leaf_idx++) { ColumnWriter* column_writer; if (row_group_writer_->buffered()) { From f9da7ad6ee478a17a6c5c29016b01e5b0ad2ab11 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 13 Feb 2023 19:29:28 +0100 Subject: [PATCH 04/46] Removed ReadBatch spans because they were orphans and didn't represent real work. Also, they were not linked in any way to the next() spans in the scanner even though they should be. --- cpp/src/arrow/compute/exec/source_node.cc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index ffb19d2e106..466e2a09eeb 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -139,9 +139,6 @@ struct SourceNode : ExecNode, public TracedNode { } lock.unlock(); - util::tracing::Span fetch_batch_span; - auto fetch_batch_scope = - START_SCOPED_SPAN(fetch_batch_span, "SourceNode::ReadBatch"); return generator_().Then( [this]( const std::optional& morsel_or_end) -> Future> { @@ -153,7 +150,7 @@ struct SourceNode : ExecNode, public TracedNode { SliceAndDeliverMorsel(*morsel_or_end); lock.lock(); if (!backpressure_future_.is_finished()) { - EVENT_ON_CURRENT_SPAN("SourceNode::BackpressureApplied"); + EVENT_ON_CURRENT_SPAN("SourceNode::BackpressureApplied"); //TODO: This should probably be posted to the SourceNode::DatasetScan span but we may need to do that manually because we don't know if that span is active return backpressure_future_.Then( []() -> ControlFlow { return Continue(); }); } From 10a80d60220f58998594972283e1fab6a7e41bb4 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 16 Feb 2023 12:04:16 +0100 Subject: [PATCH 05/46] File I/O spans now have actual read bytes as property --- cpp/src/arrow/util/io_util.cc | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index ff840aa76c9..ed9978aa2d3 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1630,7 +1630,6 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { ::arrow::util::tracing::Span span; START_SPAN(span, "FileRead", { - {"nbytes", nbytes}, {"fd", fd} }); @@ -1667,6 +1666,11 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { buffer += bytes_read; total_bytes_read += bytes_read; } +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + raw_span->SetAttribute("bytes_read", total_bytes_read); +#endif return total_bytes_read; } @@ -1675,7 +1679,6 @@ Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nb ::arrow::util::tracing::Span span; START_SPAN(span, "FileReadAt", { - {"nbytes", nbytes}, {"fd", fd} }); @@ -1695,6 +1698,11 @@ Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nb position += ret; bytes_read += ret; } +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + raw_span->SetAttribute("bytes_read", bytes_read); +#endif return bytes_read; } From ab34c0878f25cad622c12ace1b1a6faa4be1d647 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 16 Feb 2023 12:05:47 +0100 Subject: [PATCH 06/46] Update span name --- cpp/src/parquet/arrow/reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 5b39de93d9c..8b9eeb9c9e2 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -286,7 +286,7 @@ class FileReaderImpl : public FileReader { std::string phys_type = TypeToString(reader_->metadata()->schema()->Column(i)->physical_type()); ::arrow::util::tracing::Span span; - START_SPAN(span, "parquet::arrow::read_column", + START_SPAN(span, "parquet::arrow::ReadColumn", {{"parquet.arrow.columnindex", i}, {"parquet.arrow.columnname", column_name}, {"parquet.arrow.physicaltype", phys_type}, From 4e4858149dcafa2dbaa5efe05239f34c42a078cc Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 16 Feb 2023 12:06:24 +0100 Subject: [PATCH 07/46] Added spans to ipc compression functions because they are called by a ParallelFor --- cpp/src/arrow/ipc/reader.cc | 7 +++++++ cpp/src/arrow/ipc/writer.cc | 7 +++++++ cpp/src/parquet/arrow/writer.cc | 2 +- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index a1b17afaaf9..4ab6e743c5d 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -56,6 +56,7 @@ #include "arrow/util/parallel.h" #include "arrow/util/string.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/ubsan.h" #include "arrow/util/vector.h" #include "arrow/visit_type_inline.h" @@ -503,6 +504,12 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op return ::arrow::internal::OptionalParallelFor( options.use_threads, static_cast(buffers.size()), [&](int i) { + ::arrow::util::tracing::Span span; + START_SPAN(span, "arrow::ipc::DecompressBuffers", + {{"buffer_index", i}, + {"ipc.compression.codec", codec.get()->name().c_str()}, + {"ipc.options.use_threads", options.use_threads}, + {"size.uncompressed", (*buffers[i])->size() - sizeof(int64_t)}}); ARROW_ASSIGN_OR_RAISE(*buffers[i], DecompressBuffer(*buffers[i], options, codec.get())); return Status::OK(); diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index dfd390349c5..29cba1f05fc 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -54,6 +54,7 @@ #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/parallel.h" +#include "arrow/util/tracing_internal.h" #include "arrow/visit_array_inline.h" #include "arrow/visit_type_inline.h" @@ -198,6 +199,12 @@ class RecordBatchSerializer { auto CompressOne = [&](size_t i) { if (out_->body_buffers[i]->size() > 0) { + ::arrow::util::tracing::Span span; + START_SPAN(span, "arrow::ipc::DecompressBuffers", + {{"buffer_index", i}, + {"ipc.compression.codec", options_.codec.get()->name().c_str()}, + {"ipc.options.use_threads", options_.use_threads}, + {"size.uncompressed", out_->body_buffers[i]->size()}}); RETURN_NOT_OK(CompressBuffer(*out_->body_buffers[i], options_.codec.get(), &out_->body_buffers[i])); } diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 331153731de..f281726fe71 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -132,7 +132,7 @@ class ArrowColumnWriterV2 { // Columns are written in DFS order. Status Write(ArrowWriteContext* ctx) { ::arrow::util::tracing::Span span; - START_SPAN(span, "ArrowColumnWriterV2::Write", {{"threadpool", "IO"}}); + START_SPAN(span, "parquet::arrow::ArrowColumnWriterV2::Write"); for (int leaf_idx = 0; leaf_idx < leaf_count_; leaf_idx++) { ColumnWriter* column_writer; if (row_group_writer_->buffered()) { From 1a1307a66ca64c0c71d5a331e42d806e632479df Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 16 Feb 2023 12:24:13 +0100 Subject: [PATCH 08/46] Added a span to a parallelFor lambda in parquet: GetRecordBatchReader::NextBatch --- cpp/src/parquet/arrow/reader.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 8b9eeb9c9e2..50b4054ac66 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -1037,7 +1037,11 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( reader_properties_.use_threads(), static_cast(readers.size()), - [&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); })); + [&](int i) { + ::arrow::util::tracing::Span span; + START_SPAN(span, "parquet::arrow::GetRecordBatchReader::NextBatch"); + return readers[i]->NextBatch(batch_size, &columns[i]); + })); for (const auto& column : columns) { if (column == nullptr || column->length() == 0) { From 56b8535ba28f7cc0b125488cad72522d1ff8dd94 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 16 Feb 2023 12:37:59 +0100 Subject: [PATCH 09/46] Creating a single span for a projection instead of 1 for each expression --- cpp/src/arrow/compute/exec/project_node.cc | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 3bff2d6ee3d..20b22c75f30 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -79,12 +79,16 @@ class ProjectNode : public MapNode { Result ProcessBatch(ExecBatch batch) override { std::vector values{exprs_.size()}; + util::tracing::Span span; + START_COMPUTE_SPAN(span, "Project", {{"project.length", batch.length }}); for (size_t i = 0; i < exprs_.size(); ++i) { - util::tracing::Span span; - START_COMPUTE_SPAN(span, "Project", - {{"project.type", exprs_[i].type()->ToString()}, - {"project.length", batch.length}, - {"project.expression", exprs_[i].ToString()}}); +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + std::string project_name = "project[" + std::to_string(i) + "]"; + raw_span->SetAttribute(project_name + ".type", exprs_[i].type()->ToString()); + raw_span->SetAttribute(project_name + ".expression", exprs_[i].ToString()); +#endif ARROW_ASSIGN_OR_RAISE(Expression simplified_expr, SimplifyWithGuarantee(exprs_[i], batch.guarantee)); From 56c52af14aec9b5298549ff09ec24733c16cb9b0 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 16 Feb 2023 14:39:58 +0100 Subject: [PATCH 10/46] Added span type attributes to task submission spans. These spans are logical, because the actual work does not necessarily start rightaway. --- cpp/src/arrow/util/async_util.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 0a59a462c95..79985c481c6 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -131,12 +131,15 @@ ::arrow::internal::tracing::Scope TraceTaskSubmitted(AsyncTaskScheduler::Task* t } return START_SCOPED_SPAN_WITH_PARENT_SV(task->span, parent, task->name(), - {{"task.cost", task->cost()}}); + {{"task.cost", task->cost()}, + {"span.type", "AsyncTask"}}); } void TraceTaskQueued(AsyncTaskScheduler::Task* task, const util::tracing::Span& parent) { START_SCOPED_SPAN_WITH_PARENT_SV(task->span, parent, task->name(), - {{"task.cost", task->cost()}}); + {{"task.cost", task->cost()}, + {"span.type", "AsyncTask"}, + {"task.queued", true}}); } void TraceTaskFinished(AsyncTaskScheduler::Task* task) { END_SPAN(task->span); } From 815d31770714258bdc9f0382cff30ddc9b7a1cf0 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 16 Feb 2023 14:44:30 +0100 Subject: [PATCH 11/46] No longer creating a span when task submission is queued (throttled) Those spans became duplicates in practice, instead of having an event added to them when they were submitted. During that time the (possibly multiple!) duplicates would remain active until the task was finally finished. Instead, we are now adding an event to the scheduler's span that says task submission was throttled. --- cpp/src/arrow/util/async_util.cc | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 79985c481c6..5f78d0c958d 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -125,23 +125,11 @@ class FifoQueue : public ThrottledAsyncTaskScheduler::Queue { #ifdef ARROW_WITH_OPENTELEMETRY ::arrow::internal::tracing::Scope TraceTaskSubmitted(AsyncTaskScheduler::Task* task, const util::tracing::Span& parent) { - if (task->span.valid()) { - EVENT(task->span, "task submitted"); - return ACTIVATE_SPAN(task->span); - } - return START_SCOPED_SPAN_WITH_PARENT_SV(task->span, parent, task->name(), {{"task.cost", task->cost()}, {"span.type", "AsyncTask"}}); } -void TraceTaskQueued(AsyncTaskScheduler::Task* task, const util::tracing::Span& parent) { - START_SCOPED_SPAN_WITH_PARENT_SV(task->span, parent, task->name(), - {{"task.cost", task->cost()}, - {"span.type", "AsyncTask"}, - {"task.queued", true}}); -} - void TraceTaskFinished(AsyncTaskScheduler::Task* task) { END_SPAN(task->span); } void TraceSchedulerAbort(const Status& error) { EVENT_ON_CURRENT_SPAN(error.ToString()); } @@ -309,7 +297,9 @@ class ThrottledAsyncTaskSchedulerImpl std::optional> maybe_backoff = throttle_->TryAcquire(latched_cost); if (maybe_backoff) { #ifdef ARROW_WITH_OPENTELEMETRY - TraceTaskQueued(task.get(), span()); + EVENT(span(), "Task submission throttled", { + {"task.name", task->name()}, + {"task.cost", task->cost()}}); #endif queue_->Push(std::move(task)); lk.unlock(); From 481bbe4d178998ffa86dbc863c41fd378aedd649 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 16 Feb 2023 14:48:40 +0100 Subject: [PATCH 12/46] Creating a string_view from the task name basic_string_view --- cpp/src/arrow/util/async_util.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 5f78d0c958d..c60a0bcce17 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -298,7 +298,7 @@ class ThrottledAsyncTaskSchedulerImpl if (maybe_backoff) { #ifdef ARROW_WITH_OPENTELEMETRY EVENT(span(), "Task submission throttled", { - {"task.name", task->name()}, + {"task.name", ::opentelemetry::nostd::string_view(task->name().data(), task->name().size())}, {"task.cost", task->cost()}}); #endif queue_->Push(std::move(task)); From 1f3adee5b0dcf48bc202ff17e46bbe7ae5f562f5 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 17 Feb 2023 17:09:17 +0100 Subject: [PATCH 13/46] Added the size of the output to some spans. This allows calculating average throughput of a task. --- cpp/src/arrow/compute/exec/filter_node.cc | 20 ++++++++++++--- cpp/src/arrow/compute/exec/project_node.cc | 8 +++++- cpp/src/arrow/compute/exec/util.cc | 8 ++++-- cpp/src/arrow/csv/reader.cc | 29 ++++++++++++++++++++++ cpp/src/arrow/dataset/dataset_writer.cc | 13 +++++++++- cpp/src/parquet/arrow/reader.cc | 12 ++++++++- 6 files changed, 82 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index b6877f106dc..6b42a6f69c6 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -71,12 +71,16 @@ class FilterNode : public MapNode { Result ProcessBatch(ExecBatch batch) override { ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, SimplifyWithGuarantee(filter_, batch.guarantee)); - +#ifdef ARROW_WITH_OPENTELEMETRY util::tracing::Span span; START_COMPUTE_SPAN(span, "Filter", {{"filter.expression", ToStringExtra()}, {"filter.expression.simplified", simplified_filter.ToString()}, - {"filter.length", batch.length}}); + {"filter.length", batch.length}, + {"input_batch.size_bytes", batch.TotalBufferSize()}}); + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); +#endif ARROW_ASSIGN_OR_RAISE( Datum mask, ExecuteScalarExpression(simplified_filter, batch, @@ -85,8 +89,14 @@ class FilterNode : public MapNode { if (mask.is_scalar()) { const auto& mask_scalar = mask.scalar_as(); if (mask_scalar.is_valid && mask_scalar.value) { +#ifdef ARROW_WITH_OPENTELEMETRY + raw_span->SetAttribute("output_batch.size_bytes", batch.TotalBufferSize()); +#endif return batch; } +#ifdef ARROW_WITH_OPENTELEMETRY + raw_span->SetAttribute("output_batch.size_bytes", 0); +#endif return batch.Slice(0, 0); } @@ -99,7 +109,11 @@ class FilterNode : public MapNode { if (value.is_scalar()) continue; ARROW_ASSIGN_OR_RAISE(value, Filter(value, mask, FilterOptions::Defaults())); } - return ExecBatch::Make(std::move(values)); + auto filtered_batch = ExecBatch::Make(std::move(values)); +#ifdef ARROW_WITH_OPENTELEMETRY + raw_span->SetAttribute("output_batch.size_bytes", filtered_batch->TotalBufferSize()); +#endif + return filtered_batch; } protected: diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 20b22c75f30..61f8a5979c9 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -80,7 +80,8 @@ class ProjectNode : public MapNode { Result ProcessBatch(ExecBatch batch) override { std::vector values{exprs_.size()}; util::tracing::Span span; - START_COMPUTE_SPAN(span, "Project", {{"project.length", batch.length }}); + START_COMPUTE_SPAN(span, "Project", {{"project.length", batch.length }, + {"input_batch.size_bytes", batch.TotalBufferSize()}}); for (size_t i = 0; i < exprs_.size(); ++i) { #ifdef ARROW_WITH_OPENTELEMETRY opentelemetry::nostd::shared_ptr raw_span = @@ -96,6 +97,11 @@ class ProjectNode : public MapNode { values[i], ExecuteScalarExpression(simplified_expr, batch, plan()->query_context()->exec_context())); } +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + raw_span->SetAttribute("output_batch.size_bytes", batch.TotalBufferSize()); +#endif return ExecBatch{std::move(values), batch.length}; } diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc index 752f8cac764..1cd450ea7da 100644 --- a/cpp/src/arrow/compute/exec/util.cc +++ b/cpp/src/arrow/compute/exec/util.cc @@ -426,14 +426,18 @@ void TracedNode::NoteStartProducing(std::string extra_details) const { util::tracing::Span span; return START_SCOPED_SPAN( span, node_kind + "::InputReceived", - {{"node.label", node_->label()}, {"node.batch_length", batch.length}}); + {{"node.label", node_->label()}, + {"node.batch_length", batch.length}, + {"batch.size_bytes", batch.TotalBufferSize()}}); } void TracedNode::NoteInputReceived(const ExecBatch& batch) const { std::string node_kind(node_->kind_name()); EVENT_ON_CURRENT_SPAN( node_kind + "::InputReceived", - {{"node.label", node_->label()}, {"node.batch_length", batch.length}}); + {{"node.label", node_->label()}, + {"node.batch_length", batch.length}, + {"batch.size_bytes", batch.TotalBufferSize()}}); } [[nodiscard]] ::arrow::internal::tracing::Scope TracedNode::TraceFinish() const { diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index fdc7fcb1380..1323d7ef93a 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -43,12 +43,14 @@ #include "arrow/type.h" #include "arrow/type_fwd.h" #include "arrow/util/async_generator.h" +#include "arrow/util/byte_size.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/utf8_internal.h" #include "arrow/util/vector.h" @@ -401,6 +403,8 @@ class BlockParsingOperator { num_rows_seen_(first_row) {} Result operator()(const CSVBlock& block) { + util::tracing::Span span; + START_SPAN(span, "arrow::csv::BlockParsingOperator"); constexpr int32_t max_num_rows = std::numeric_limits::max(); auto parser = std::make_shared( io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); @@ -431,6 +435,11 @@ class BlockParsingOperator { num_rows_seen_ += parser->total_num_rows(); } RETURN_NOT_OK(block.consume_bytes(parsed_size)); +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + raw_span->SetAttribute("parsed_size", parsed_size); +#endif return ParsedBlock{std::move(parser), block.block_index, static_cast(parsed_size) + block.bytes_skipped}; } @@ -448,6 +457,8 @@ class BlockParsingOperator { class BlockDecodingOperator { public: Future operator()(const ParsedBlock& block) { + util::tracing::Span span; + START_SPAN(span, "arrow::csv::BlockDecodingOperator"); DCHECK(!state_->column_decoders.empty()); std::vector>> decoded_array_futs; for (auto& decoder : state_->column_decoders) { @@ -456,6 +467,22 @@ class BlockDecodingOperator { auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped; auto decoded_arrays_fut = All(std::move(decoded_array_futs)); auto state = state_; +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + return decoded_arrays_fut.Then( + [state, bytes_parsed_or_skipped, raw_span]( + const std::vector>>& maybe_decoded_arrays) + -> Result { + ARROW_ASSIGN_OR_RAISE(auto decoded_arrays, + arrow::internal::UnwrapOrRaise(maybe_decoded_arrays)); + + ARROW_ASSIGN_OR_RAISE(auto batch, + state->DecodedArraysToBatch(std::move(decoded_arrays))); + raw_span->SetAttribute("arrow.csv.output_batch_size_bytes", util::TotalBufferSize(*batch)); + return DecodedBlock{std::move(batch), bytes_parsed_or_skipped}; + }); +#else return decoded_arrays_fut.Then( [state, bytes_parsed_or_skipped]( const std::vector>>& maybe_decoded_arrays) @@ -465,8 +492,10 @@ class BlockDecodingOperator { ARROW_ASSIGN_OR_RAISE(auto batch, state->DecodedArraysToBatch(std::move(decoded_arrays))); + raw_span->SetAttribute("parsed_size", parsed_size); return DecodedBlock{std::move(batch), bytes_parsed_or_skipped}; }); +#endif } static Result Make(io::IOContext io_context, diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index d33cbbe5875..56601752258 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -26,6 +26,7 @@ #include "arrow/record_batch.h" #include "arrow/result.h" #include "arrow/table.h" +#include "arrow/util/byte_size.h" #include "arrow/util/future.h" #include "arrow/util/logging.h" #include "arrow/util/map.h" @@ -234,8 +235,18 @@ class DatasetWriterFileQueue { return DeferNotOk(options_.filesystem->io_context().executor()->Submit( [self = this, batch = std::move(next)]() { int64_t rows_to_release = batch->num_rows(); +#ifdef ARROW_WITH_OPENTELEMETRY + uint64_t size_bytes = util::TotalBufferSize(*batch); + uint64_t num_buffers = 0; + for (auto column: batch->columns()) { + num_buffers += column->data()->buffers.size(); + } util::tracing::Span span; - START_SPAN(span, "DatasetWriter::WriteNext", {{"threadpool", "IO"}}); + START_SPAN(span, "DatasetWriter::WriteNext", + {{"threadpool", "IO"}, + {"batch.size_bytes", size_bytes}, + {"batch.num_buffers", num_buffers}}); +#endif Status status = self->writer_->Write(batch); self->writer_state_->rows_in_flight_throttle.Release(rows_to_release); return status; diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 50b4054ac66..57b422e79ad 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -33,6 +33,7 @@ #include "arrow/type.h" #include "arrow/util/async_generator.h" #include "arrow/util/bit_util.h" +#include "arrow/util/byte_size.h" #include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" @@ -291,8 +292,17 @@ class FileReaderImpl : public FileReader { {"parquet.arrow.columnname", column_name}, {"parquet.arrow.physicaltype", phys_type}, {"parquet.arrow.records_to_read", records_to_read}}); -#endif + + auto status = reader->NextBatch(records_to_read, out); + + uint64_t size_bytes = ::arrow::util::TotalBufferSize(*out->get()); + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + raw_span->SetAttribute("parquet.arrow.output_batch_size_bytes", size_bytes); + return status; +#else return reader->NextBatch(records_to_read, out); +#endif END_PARQUET_CATCH_EXCEPTIONS } From 43a7fba44ba796c64c09bdf6e16888e730ee86b0 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 17 Feb 2023 17:11:14 +0100 Subject: [PATCH 14/46] Added (presumably missing) START_SPAN to OrderBySinkNode --- cpp/src/arrow/compute/exec/sink_node.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 0b179cd6cd8..4c640f75962 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -479,6 +479,7 @@ struct OrderBySinkNode final : public SinkNode { Status Finish() override { util::tracing::Span span; + START_SPAN(span, std::string(kind_name()) + "::Finish"); ARROW_RETURN_NOT_OK(DoFinish()); return SinkNode::Finish(); } From af56a271921fdfde78b9edbd497990a6ba2ac60c Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 27 Feb 2023 16:30:40 +0100 Subject: [PATCH 15/46] Removed unintended line --- cpp/src/arrow/csv/reader.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 1323d7ef93a..4a013bd5603 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -492,7 +492,6 @@ class BlockDecodingOperator { ARROW_ASSIGN_OR_RAISE(auto batch, state->DecodedArraysToBatch(std::move(decoded_arrays))); - raw_span->SetAttribute("parsed_size", parsed_size); return DecodedBlock{std::move(batch), bytes_parsed_or_skipped}; }); #endif From 24b5ff81afde1aba16a29c6cf7b1d9329602ebda Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 27 Feb 2023 19:42:11 +0100 Subject: [PATCH 16/46] Do not create toplevel task span at task submission --- cpp/src/arrow/util/async_util.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index c60a0bcce17..7c26ebc5b79 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -194,7 +194,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { if (!submit_result->TryAddCallback([this, task_inner = std::move(task)]() mutable { return [this, task_inner2 = std::move(task_inner)](const Status& st) { #ifdef ARROW_WITH_OPENTELEMETRY - TraceTaskFinished(task_inner2.get()); + //TraceTaskFinished(task_inner2.get()); #endif OnTaskFinished(st); }; @@ -245,7 +245,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { // It's important that the task's span be active while we run the submit function. // Normally the submit function should transfer the span to the thread task as the // active span. - auto scope = TraceTaskSubmitted(task.get(), span_); + //auto scope = TraceTaskSubmitted(task.get(), span_); #endif running_tasks_++; lk.unlock(); From 37e84e42b5dc386c48876e4d837e00d155f703a0 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 28 Feb 2023 15:01:55 +0100 Subject: [PATCH 17/46] Removed arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next span in favor of a ReadNextAsync span The former was not capturing the actual work performed, looking at profiling output from e.g. VTune. --- cpp/src/arrow/csv/reader.cc | 2 ++ cpp/src/arrow/dataset/file_csv.cc | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 4a013bd5603..21c9a82e37a 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -909,6 +909,8 @@ class StreamingReaderImpl : public ReaderMixin, } Future> ReadNextAsync() override { + util::tracing::Span span; + START_SPAN(span, "arrow::csv::ReadNextAsync"); return record_batch_gen_(); } diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 122e7f79708..59fe995542a 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -384,8 +384,6 @@ Result CsvFileFormat::ScanBatchesAsync( auto reader_fut = OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool()); auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size); - WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN( - generator, "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next"); return generator; } From ed397fc486c8b3e182b3afc21ebf1ad8a4d076cb Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 28 Feb 2023 22:08:55 +0100 Subject: [PATCH 18/46] Added the size of the batch outputted by CSV reader to the span. Note that the output size as reported by the arrow::csv::ReadNextAsync span corresponds to the output size of the arrow::csv::BlockDecodingOperator child of its *predecessor. The first block of data is already processed during initialization (captured by the arrow::csv::InitAfterFirstBuffer span). --- cpp/src/arrow/csv/reader.cc | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 21c9a82e37a..57e0677d1de 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -911,7 +911,22 @@ class StreamingReaderImpl : public ReaderMixin, Future> ReadNextAsync() override { util::tracing::Span span; START_SPAN(span, "arrow::csv::ReadNextAsync"); - return record_batch_gen_(); + auto future = record_batch_gen_(); +#ifdef ARROW_WITH_OPENTELEMETRY + auto longer_living_span = std::make_unique(std::move(span)); + future.AddCallback([span = std::move(longer_living_span)](const arrow::Result>& result){ + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(span->details.get()); + if (result.ok()) { + auto result_batch = result.ValueOrDie(); + if (result_batch) { + raw_span->SetAttribute("batch.size_byte", ::arrow::util::TotalBufferSize(*result_batch)); + } + } + END_SPAN((*span)); + }); +#endif + return future; } protected: @@ -922,6 +937,9 @@ class StreamingReaderImpl : public ReaderMixin, return Status::Invalid("Empty CSV file"); } + util::tracing::Span span; + START_SPAN(span, "arrow::csv::InitAfterFirstBuffer"); + std::shared_ptr after_header; ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed, ProcessHeader(first_buffer, &after_header)); @@ -941,9 +959,12 @@ class StreamingReaderImpl : public ReaderMixin, auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op)); auto self = shared_from_this(); - return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) { - return self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); + auto init_finished = rb_gen().Then([self, rb_gen, max_readahead, span = std::move(span)](const DecodedBlock& first_block) { + auto fut = self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); + END_SPAN(span); + return fut; }); + return init_finished; } Future<> InitFromBlock(const DecodedBlock& block, From 2f7fa9706694f41e6e27689f30061993c240e2e1 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 1 Mar 2023 15:04:27 +0100 Subject: [PATCH 19/46] Added span for processing the first block. It is named after the ReadNextAsync span so we can do a groupby using that name and get all the actual work performed for reading CSV. --- cpp/src/arrow/dataset/file_csv.cc | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 59fe995542a..ab2df9d0432 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -280,6 +280,7 @@ static inline Future> OpenReaderAsync( const std::shared_ptr& scan_options, Executor* cpu_executor) { #ifdef ARROW_WITH_OPENTELEMETRY auto tracer = arrow::internal::tracing::GetTracer(); + // This span also captures task submission, possibly including wait time auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync"); #endif ARROW_ASSIGN_OR_RAISE( @@ -300,15 +301,24 @@ static inline Future> OpenReaderAsync( // input->Peek call blocks so we run the whole thing on the I/O thread pool. auto reader_fut = DeferNotOk(input->io_context().executor()->Submit( [=]() -> Future> { + util::tracing::Span span; + // Name this span after the the one in scanner.cc:ReadNext(), + // because that allows grouping all the CSV reading work by that span name + START_SPAN(span, "arrow::csv::ReadNextAsync", {{"threadpool", "IO"}}); + ARROW_ASSIGN_OR_RAISE(auto first_block, input->Peek(reader_options.block_size)); const auto& parse_options = format.parse_options; ARROW_ASSIGN_OR_RAISE( auto convert_options, GetConvertOptions(format, scan_options ? scan_options.get() : nullptr, first_block)); - return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input), - cpu_executor, reader_options, - parse_options, convert_options); + auto fut = csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input), + cpu_executor, reader_options, + parse_options, convert_options); + return fut.Then([span = std::move(span)](const std::shared_ptr& reader){ + END_SPAN(span); + return reader; + }); })); return reader_fut.Then( // Adds the filename to the error From 38c4639eaf5bf9b02e9ed304e66b30f4e691332c Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 1 Mar 2023 15:56:06 +0100 Subject: [PATCH 20/46] Creating explicit ProcessMorsel and DatasetScan spans --- cpp/src/arrow/compute/exec/source_node.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 466e2a09eeb..38153bddfd7 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -87,6 +87,8 @@ struct SourceNode : ExecNode, public TracedNode { } plan_->query_context()->ScheduleTask( [=]() { + util::tracing::Span span; + START_SPAN(span, "SourceNode::ProcessMorsel"); int64_t offset = 0; do { int64_t batch_size = @@ -107,6 +109,7 @@ struct SourceNode : ExecNode, public TracedNode { Status StartProducing() override { NoteStartProducing(ToStringExtra()); + { // If another exec node encountered an error during its StartProducing call // it might have already called StopProducing on all of its inputs (including this @@ -128,6 +131,9 @@ struct SourceNode : ExecNode, public TracedNode { options.should_schedule = ShouldSchedule::IfDifferentExecutor; ARROW_ASSIGN_OR_RAISE(Future<> scan_task, plan_->query_context()->BeginExternalTask( "SourceNode::DatasetScan")); + util::tracing::Span span; + START_SPAN(span, "SourceNode::DatasetScan"); + if (!scan_task.is_valid()) { // Plan has already been aborted, no need to start scanning return Status::OK(); From 8764d340794400e42414c2b0c7c54dec5e78ca5a Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 2 Mar 2023 11:43:37 +0100 Subject: [PATCH 21/46] Added span for keeping track of re-chunking during scanning --- cpp/src/arrow/dataset/dataset_internal.h | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index aac437df075..57b4eda3bcb 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -29,8 +29,10 @@ #include "arrow/scalar.h" #include "arrow/type.h" #include "arrow/util/async_generator.h" +#include "arrow/util/byte_size.h" #include "arrow/util/checked_cast.h" #include "arrow/util/iterator.h" +#include "arrow/util/tracing_internal.h" namespace arrow { namespace dataset { @@ -144,6 +146,12 @@ inline RecordBatchGenerator MakeChunkedBatchGenerator(RecordBatchGenerator gen, [batch_size](const std::shared_ptr& batch) -> ::arrow::AsyncGenerator> { const int64_t rows = batch->num_rows(); + util::tracing::Span span; + START_SPAN(span, "MakeChunkedBatchGenerator", { + {"target_batch_size_rows", batch_size}, + {"batch.size_rows", rows}, + {"batch.size_bytes", util::TotalBufferSize(*batch)}, + {"output_batches", rows / batch_size + (rows % batch_size != 0)}}); if (rows <= batch_size) { return ::arrow::MakeVectorGenerator>({batch}); } From 58c9db4fec637a7cfd749a3537c39da1ff78870b Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 2 Mar 2023 14:31:11 +0100 Subject: [PATCH 22/46] Reverting back to PeekAndMakeAsync, because the span does sometimes include async wait and is not equivalent to a ReadNextAsync. --- cpp/src/arrow/dataset/file_csv.cc | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index ab2df9d0432..418373ffeb2 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -301,10 +301,8 @@ static inline Future> OpenReaderAsync( // input->Peek call blocks so we run the whole thing on the I/O thread pool. auto reader_fut = DeferNotOk(input->io_context().executor()->Submit( [=]() -> Future> { - util::tracing::Span span; - // Name this span after the the one in scanner.cc:ReadNext(), - // because that allows grouping all the CSV reading work by that span name - START_SPAN(span, "arrow::csv::ReadNextAsync", {{"threadpool", "IO"}}); + util::tracing::Span lambda_span; + START_SPAN(lambda_span, "arrow::csv::PeekAndMakeAsync", {{"threadpool", "IO"}}); ARROW_ASSIGN_OR_RAISE(auto first_block, input->Peek(reader_options.block_size)); const auto& parse_options = format.parse_options; @@ -315,8 +313,8 @@ static inline Future> OpenReaderAsync( auto fut = csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input), cpu_executor, reader_options, parse_options, convert_options); - return fut.Then([span = std::move(span)](const std::shared_ptr& reader){ - END_SPAN(span); + return fut.Then([lambda_span = std::move(lambda_span)](const std::shared_ptr& reader){ + END_SPAN(lambda_span); return reader; }); })); From ee28b213f81a8ba7ce72eb7b6c8ffd9e49338b4e Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 2 Mar 2023 14:31:43 +0100 Subject: [PATCH 23/46] Posting datasetwriter backpressure events to the asyncscheduler span --- cpp/src/arrow/dataset/dataset_writer.cc | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 56601752258..222416e981d 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -612,13 +612,21 @@ class DatasetWriter::DatasetWriterImpl { backpressure = writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows()); if (!backpressure.is_finished()) { - EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued"); +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::nostd::shared_ptr scheduler_span = + ::arrow::internal::tracing::UnwrapSpan(scheduler_->span().details.get()); + scheduler_span->AddEvent("DatasetWriter::Backpressure::TooManyRowsQueued"); +#endif break; } if (will_open_file) { backpressure = writer_state_.open_files_throttle.Acquire(1); if (!backpressure.is_finished()) { - EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles"); +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::nostd::shared_ptr scheduler_span = + ::arrow::internal::tracing::UnwrapSpan(scheduler_->span().details.get()); + scheduler_span->AddEvent("DatasetWriter::Backpressure::TooManyOpenFiles"); +#endif RETURN_NOT_OK(CloseLargestFile()); break; } From 1163ef1896962d9988536f51dae95cbec8b9a11f Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 2 Mar 2023 16:11:46 +0100 Subject: [PATCH 24/46] Added push and pop spans to the datasetwriter that allow tracking staging area --- cpp/src/arrow/dataset/dataset_writer.cc | 45 ++++++++++++++++--------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 222416e981d..ea34f5c6a98 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -156,8 +156,6 @@ class DatasetWriterFileQueue { Result> PopStagedBatch() { std::vector> batches_to_write; uint64_t num_rows = 0; - util::tracing::Span span; - START_SPAN(span, "DatasetWriter::PopStagedBatch"); while (!staged_batches_.empty()) { std::shared_ptr next = std::move(staged_batches_.front()); staged_batches_.pop_front(); @@ -193,18 +191,35 @@ class DatasetWriterFileQueue { } Result PopAndDeliverStagedBatch() { + util::tracing::Span span; + START_SPAN(span, "DatasetWriter::Pop"); ARROW_ASSIGN_OR_RAISE(std::shared_ptr next_batch, PopStagedBatch()); int64_t rows_popped = next_batch->num_rows(); rows_currently_staged_ -= next_batch->num_rows(); +#ifdef ARROW_WITH_OPENTELEMETRY + ::arrow::internal::tracing::UnwrapSpan(span.details.get())->SetAttribute("batch.size_rows", next_batch->num_rows()); + ::arrow::internal::tracing::UnwrapSpan(span.details.get())->SetAttribute("rows_currently_staged", rows_currently_staged_); +#endif ScheduleBatch(std::move(next_batch)); return rows_popped; } // Stage batches, popping and delivering batches if enough data has arrived Status Push(std::shared_ptr batch) { - uint64_t delta_staged = batch->num_rows(); - rows_currently_staged_ += delta_staged; - staged_batches_.push_back(std::move(batch)); + uint64_t delta_staged = batch->num_rows(); + rows_currently_staged_ += delta_staged; + { + util::tracing::Span span; + START_SPAN(span, "DatasetWriter::Push", { + {"batch.size_rows", batch->num_rows()}, + {"rows_currently_staged", rows_currently_staged_}, + // staged_rows_count is updated at the end, after this push and possibly multiple pops +// {"staged_rows_count", writer_state_->staged_rows_count}, + {"options_.min_rows_per_group", options_.min_rows_per_group}, + {"max_rows_staged", writer_state_->max_rows_staged} + }); + staged_batches_.push_back(std::move(batch)); + } while (!staged_batches_.empty() && (writer_state_->StagingFull() || rows_currently_staged_ >= options_.min_rows_per_group)) { @@ -354,6 +369,8 @@ class DatasetWriterDirectoryQueue { latest_open_file_tasks_ = util::MakeThrottledAsyncTaskGroup( scheduler_, 1, /*queue=*/nullptr, std::move(file_finish_task)); if (init_future_.is_valid()) { + util::tracing::Span span; + START_SPAN(span, "arrow::dataset::WaitForDirectoryInit"); latest_open_file_tasks_->AddSimpleTask( [init_future = init_future_]() { return init_future; }, "DatasetWriter::WaitForDirectoryInit"sv); @@ -365,6 +382,8 @@ class DatasetWriterDirectoryQueue { uint64_t rows_written() const { return rows_written_; } void PrepareDirectory() { + util::tracing::Span span; + START_SPAN(span, "arrow::dataset::SubmitPrepareDirectoryTask"); if (directory_.empty() || !write_options_.create_dir) { return; } @@ -386,6 +405,8 @@ class DatasetWriterDirectoryQueue { if (write_options_.existing_data_behavior == ExistingDataBehavior::kDeleteMatchingPartitions) { init_task = [this, create_dir_cb, notify_waiters_cb, notify_waiters_on_err_cb] { + util::tracing::Span span; + START_SPAN(span, "arrow::dataset::PrepareDirectory"); return write_options_.filesystem ->DeleteDirContentsAsync(directory_, /*missing_dir_ok=*/true) @@ -612,21 +633,15 @@ class DatasetWriter::DatasetWriterImpl { backpressure = writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows()); if (!backpressure.is_finished()) { -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr scheduler_span = - ::arrow::internal::tracing::UnwrapSpan(scheduler_->span().details.get()); - scheduler_span->AddEvent("DatasetWriter::Backpressure::TooManyRowsQueued"); -#endif + EVENT(scheduler_->span(), "DatasetWriter::Backpressure::TooManyRowsQueued"); + EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyRowsQueued"); break; } if (will_open_file) { backpressure = writer_state_.open_files_throttle.Acquire(1); if (!backpressure.is_finished()) { -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr scheduler_span = - ::arrow::internal::tracing::UnwrapSpan(scheduler_->span().details.get()); - scheduler_span->AddEvent("DatasetWriter::Backpressure::TooManyOpenFiles"); -#endif + EVENT(scheduler_->span(), "DatasetWriter::Backpressure::TooManyOpenFiles"); + EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles"); RETURN_NOT_OK(CloseLargestFile()); break; } From 07343d7e6e313e5e9530be4633536dc0518507f5 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 2 Mar 2023 16:16:04 +0100 Subject: [PATCH 25/46] Formatting --- cpp/src/arrow/compute/exec/filter_node.cc | 2 +- cpp/src/arrow/compute/exec/project_node.cc | 17 +++++---- cpp/src/arrow/compute/exec/source_node.cc | 8 +++- cpp/src/arrow/compute/exec/util.cc | 18 ++++----- cpp/src/arrow/csv/reader.cc | 43 +++++++++++++--------- cpp/src/arrow/dataset/dataset_internal.h | 10 ++--- cpp/src/arrow/dataset/dataset_writer.cc | 32 ++++++++-------- cpp/src/arrow/dataset/file_csv.cc | 9 +++-- cpp/src/arrow/util/async_util.cc | 17 +++++---- cpp/src/arrow/util/io_util.cc | 23 +++--------- cpp/src/arrow/util/tracing_internal.h | 3 +- cpp/src/parquet/arrow/reader.cc | 7 ++-- 12 files changed, 98 insertions(+), 91 deletions(-) diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 6b42a6f69c6..faef1ef987b 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -79,7 +79,7 @@ class FilterNode : public MapNode { {"filter.length", batch.length}, {"input_batch.size_bytes", batch.TotalBufferSize()}}); opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); #endif ARROW_ASSIGN_OR_RAISE( diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 61f8a5979c9..ef858bce5cf 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -80,15 +80,16 @@ class ProjectNode : public MapNode { Result ProcessBatch(ExecBatch batch) override { std::vector values{exprs_.size()}; util::tracing::Span span; - START_COMPUTE_SPAN(span, "Project", {{"project.length", batch.length }, - {"input_batch.size_bytes", batch.TotalBufferSize()}}); + START_COMPUTE_SPAN(span, "Project", + {{"project.length", batch.length}, + {"input_batch.size_bytes", batch.TotalBufferSize()}}); for (size_t i = 0; i < exprs_.size(); ++i) { #ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); - std::string project_name = "project[" + std::to_string(i) + "]"; - raw_span->SetAttribute(project_name + ".type", exprs_[i].type()->ToString()); - raw_span->SetAttribute(project_name + ".expression", exprs_[i].ToString()); + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + std::string project_name = "project[" + std::to_string(i) + "]"; + raw_span->SetAttribute(project_name + ".type", exprs_[i].type()->ToString()); + raw_span->SetAttribute(project_name + ".expression", exprs_[i].ToString()); #endif ARROW_ASSIGN_OR_RAISE(Expression simplified_expr, SimplifyWithGuarantee(exprs_[i], batch.guarantee)); @@ -99,7 +100,7 @@ class ProjectNode : public MapNode { } #ifdef ARROW_WITH_OPENTELEMETRY opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); raw_span->SetAttribute("output_batch.size_bytes", batch.TotalBufferSize()); #endif return ExecBatch{std::move(values), batch.length}; diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 38153bddfd7..69b4f24d10d 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -156,7 +156,13 @@ struct SourceNode : ExecNode, public TracedNode { SliceAndDeliverMorsel(*morsel_or_end); lock.lock(); if (!backpressure_future_.is_finished()) { - EVENT_ON_CURRENT_SPAN("SourceNode::BackpressureApplied"); //TODO: This should probably be posted to the SourceNode::DatasetScan span but we may need to do that manually because we don't know if that span is active + EVENT_ON_CURRENT_SPAN( + "SourceNode::BackpressureApplied"); // TODO: This should probably be + // posted to the + // SourceNode::DatasetScan span but + // we may need to do that manually + // because we don't know if that + // span is active return backpressure_future_.Then( []() -> ControlFlow { return Continue(); }); } diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc index 1cd450ea7da..dccdc5cc22f 100644 --- a/cpp/src/arrow/compute/exec/util.cc +++ b/cpp/src/arrow/compute/exec/util.cc @@ -424,20 +424,18 @@ void TracedNode::NoteStartProducing(std::string extra_details) const { const ExecBatch& batch) const { std::string node_kind(node_->kind_name()); util::tracing::Span span; - return START_SCOPED_SPAN( - span, node_kind + "::InputReceived", - {{"node.label", node_->label()}, - {"node.batch_length", batch.length}, - {"batch.size_bytes", batch.TotalBufferSize()}}); + return START_SCOPED_SPAN(span, node_kind + "::InputReceived", + {{"node.label", node_->label()}, + {"node.batch_length", batch.length}, + {"batch.size_bytes", batch.TotalBufferSize()}}); } void TracedNode::NoteInputReceived(const ExecBatch& batch) const { std::string node_kind(node_->kind_name()); - EVENT_ON_CURRENT_SPAN( - node_kind + "::InputReceived", - {{"node.label", node_->label()}, - {"node.batch_length", batch.length}, - {"batch.size_bytes", batch.TotalBufferSize()}}); + EVENT_ON_CURRENT_SPAN(node_kind + "::InputReceived", + {{"node.label", node_->label()}, + {"node.batch_length", batch.length}, + {"batch.size_bytes", batch.TotalBufferSize()}}); } [[nodiscard]] ::arrow::internal::tracing::Scope TracedNode::TraceFinish() const { diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 57e0677d1de..0204bcca07a 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -437,7 +437,7 @@ class BlockParsingOperator { RETURN_NOT_OK(block.consume_bytes(parsed_size)); #ifdef ARROW_WITH_OPENTELEMETRY opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); raw_span->SetAttribute("parsed_size", parsed_size); #endif return ParsedBlock{std::move(parser), block.block_index, @@ -469,7 +469,7 @@ class BlockDecodingOperator { auto state = state_; #ifdef ARROW_WITH_OPENTELEMETRY opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); return decoded_arrays_fut.Then( [state, bytes_parsed_or_skipped, raw_span]( const std::vector>>& maybe_decoded_arrays) @@ -479,7 +479,8 @@ class BlockDecodingOperator { ARROW_ASSIGN_OR_RAISE(auto batch, state->DecodedArraysToBatch(std::move(decoded_arrays))); - raw_span->SetAttribute("arrow.csv.output_batch_size_bytes", util::TotalBufferSize(*batch)); + raw_span->SetAttribute("arrow.csv.output_batch_size_bytes", + util::TotalBufferSize(*batch)); return DecodedBlock{std::move(batch), bytes_parsed_or_skipped}; }); #else @@ -914,17 +915,20 @@ class StreamingReaderImpl : public ReaderMixin, auto future = record_batch_gen_(); #ifdef ARROW_WITH_OPENTELEMETRY auto longer_living_span = std::make_unique(std::move(span)); - future.AddCallback([span = std::move(longer_living_span)](const arrow::Result>& result){ - opentelemetry::nostd::shared_ptr raw_span = + future.AddCallback( + [span = std::move(longer_living_span)]( + const arrow::Result>& result) { + opentelemetry::nostd::shared_ptr raw_span = ::arrow::internal::tracing::UnwrapSpan(span->details.get()); - if (result.ok()) { - auto result_batch = result.ValueOrDie(); - if (result_batch) { - raw_span->SetAttribute("batch.size_byte", ::arrow::util::TotalBufferSize(*result_batch)); - } - } - END_SPAN((*span)); - }); + if (result.ok()) { + auto result_batch = result.ValueOrDie(); + if (result_batch) { + raw_span->SetAttribute("batch.size_byte", + ::arrow::util::TotalBufferSize(*result_batch)); + } + } + END_SPAN((*span)); + }); #endif return future; } @@ -959,11 +963,14 @@ class StreamingReaderImpl : public ReaderMixin, auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op)); auto self = shared_from_this(); - auto init_finished = rb_gen().Then([self, rb_gen, max_readahead, span = std::move(span)](const DecodedBlock& first_block) { - auto fut = self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); - END_SPAN(span); - return fut; - }); + auto init_finished = + rb_gen().Then([self, rb_gen, max_readahead, + span = std::move(span)](const DecodedBlock& first_block) { + auto fut = + self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); + END_SPAN(span); + return fut; + }); return init_finished; } diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 57b4eda3bcb..1c206017290 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -147,11 +147,11 @@ inline RecordBatchGenerator MakeChunkedBatchGenerator(RecordBatchGenerator gen, -> ::arrow::AsyncGenerator> { const int64_t rows = batch->num_rows(); util::tracing::Span span; - START_SPAN(span, "MakeChunkedBatchGenerator", { - {"target_batch_size_rows", batch_size}, - {"batch.size_rows", rows}, - {"batch.size_bytes", util::TotalBufferSize(*batch)}, - {"output_batches", rows / batch_size + (rows % batch_size != 0)}}); + START_SPAN(span, "MakeChunkedBatchGenerator", + {{"target_batch_size_rows", batch_size}, + {"batch.size_rows", rows}, + {"batch.size_bytes", util::TotalBufferSize(*batch)}, + {"output_batches", rows / batch_size + (rows % batch_size != 0)}}); if (rows <= batch_size) { return ::arrow::MakeVectorGenerator>({batch}); } diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index ea34f5c6a98..c7f15cf8a8b 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -197,8 +197,10 @@ class DatasetWriterFileQueue { int64_t rows_popped = next_batch->num_rows(); rows_currently_staged_ -= next_batch->num_rows(); #ifdef ARROW_WITH_OPENTELEMETRY - ::arrow::internal::tracing::UnwrapSpan(span.details.get())->SetAttribute("batch.size_rows", next_batch->num_rows()); - ::arrow::internal::tracing::UnwrapSpan(span.details.get())->SetAttribute("rows_currently_staged", rows_currently_staged_); + ::arrow::internal::tracing::UnwrapSpan(span.details.get()) + ->SetAttribute("batch.size_rows", next_batch->num_rows()); + ::arrow::internal::tracing::UnwrapSpan(span.details.get()) + ->SetAttribute("rows_currently_staged", rows_currently_staged_); #endif ScheduleBatch(std::move(next_batch)); return rows_popped; @@ -206,18 +208,18 @@ class DatasetWriterFileQueue { // Stage batches, popping and delivering batches if enough data has arrived Status Push(std::shared_ptr batch) { - uint64_t delta_staged = batch->num_rows(); - rows_currently_staged_ += delta_staged; + uint64_t delta_staged = batch->num_rows(); + rows_currently_staged_ += delta_staged; { util::tracing::Span span; - START_SPAN(span, "DatasetWriter::Push", { - {"batch.size_rows", batch->num_rows()}, - {"rows_currently_staged", rows_currently_staged_}, - // staged_rows_count is updated at the end, after this push and possibly multiple pops -// {"staged_rows_count", writer_state_->staged_rows_count}, - {"options_.min_rows_per_group", options_.min_rows_per_group}, - {"max_rows_staged", writer_state_->max_rows_staged} - }); + START_SPAN(span, "DatasetWriter::Push", + {{"batch.size_rows", batch->num_rows()}, + {"rows_currently_staged", rows_currently_staged_}, + // staged_rows_count is updated at the end, after this push and possibly + // multiple pops + // {"staged_rows_count", writer_state_->staged_rows_count}, + {"options_.min_rows_per_group", options_.min_rows_per_group}, + {"max_rows_staged", writer_state_->max_rows_staged}}); staged_batches_.push_back(std::move(batch)); } while (!staged_batches_.empty() && @@ -253,14 +255,14 @@ class DatasetWriterFileQueue { #ifdef ARROW_WITH_OPENTELEMETRY uint64_t size_bytes = util::TotalBufferSize(*batch); uint64_t num_buffers = 0; - for (auto column: batch->columns()) { + for (auto column : batch->columns()) { num_buffers += column->data()->buffers.size(); } util::tracing::Span span; START_SPAN(span, "DatasetWriter::WriteNext", {{"threadpool", "IO"}, - {"batch.size_bytes", size_bytes}, - {"batch.num_buffers", num_buffers}}); + {"batch.size_bytes", size_bytes}, + {"batch.num_buffers", num_buffers}}); #endif Status status = self->writer_->Write(batch); self->writer_state_->rows_in_flight_throttle.Release(rows_to_release); diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 418373ffeb2..cdffc98e475 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -310,10 +310,11 @@ static inline Future> OpenReaderAsync( auto convert_options, GetConvertOptions(format, scan_options ? scan_options.get() : nullptr, first_block)); - auto fut = csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input), - cpu_executor, reader_options, - parse_options, convert_options); - return fut.Then([lambda_span = std::move(lambda_span)](const std::shared_ptr& reader){ + auto fut = csv::StreamingReader::MakeAsync( + io::default_io_context(), std::move(input), cpu_executor, reader_options, + parse_options, convert_options); + return fut.Then([lambda_span = std::move(lambda_span)]( + const std::shared_ptr& reader) { END_SPAN(lambda_span); return reader; }); diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 7c26ebc5b79..a98fbef93e0 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -125,9 +125,9 @@ class FifoQueue : public ThrottledAsyncTaskScheduler::Queue { #ifdef ARROW_WITH_OPENTELEMETRY ::arrow::internal::tracing::Scope TraceTaskSubmitted(AsyncTaskScheduler::Task* task, const util::tracing::Span& parent) { - return START_SCOPED_SPAN_WITH_PARENT_SV(task->span, parent, task->name(), - {{"task.cost", task->cost()}, - {"span.type", "AsyncTask"}}); + return START_SCOPED_SPAN_WITH_PARENT_SV( + task->span, parent, task->name(), + {{"task.cost", task->cost()}, {"span.type", "AsyncTask"}}); } void TraceTaskFinished(AsyncTaskScheduler::Task* task) { END_SPAN(task->span); } @@ -194,7 +194,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { if (!submit_result->TryAddCallback([this, task_inner = std::move(task)]() mutable { return [this, task_inner2 = std::move(task_inner)](const Status& st) { #ifdef ARROW_WITH_OPENTELEMETRY - //TraceTaskFinished(task_inner2.get()); + // TraceTaskFinished(task_inner2.get()); #endif OnTaskFinished(st); }; @@ -245,7 +245,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { // It's important that the task's span be active while we run the submit function. // Normally the submit function should transfer the span to the thread task as the // active span. - //auto scope = TraceTaskSubmitted(task.get(), span_); + // auto scope = TraceTaskSubmitted(task.get(), span_); #endif running_tasks_++; lk.unlock(); @@ -297,9 +297,10 @@ class ThrottledAsyncTaskSchedulerImpl std::optional> maybe_backoff = throttle_->TryAcquire(latched_cost); if (maybe_backoff) { #ifdef ARROW_WITH_OPENTELEMETRY - EVENT(span(), "Task submission throttled", { - {"task.name", ::opentelemetry::nostd::string_view(task->name().data(), task->name().size())}, - {"task.cost", task->cost()}}); + EVENT(span(), "Task submission throttled", + {{"task.name", ::opentelemetry::nostd::string_view(task->name().data(), + task->name().size())}, + {"task.cost", task->cost()}}); #endif queue_->Push(std::move(task)); lk.unlock(); diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index ed9978aa2d3..737abe5692e 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -31,8 +31,8 @@ #define __EXTENSIONS__ #endif -#include "arrow/util/windows_compatibility.h" // IWYU pragma: keep #include "arrow/util/tracing_internal.h" +#include "arrow/util/windows_compatibility.h" // IWYU pragma: keep #include #include @@ -1628,11 +1628,7 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { #endif int64_t total_bytes_read = 0; ::arrow::util::tracing::Span span; - START_SPAN(span, "FileRead", - { - {"fd", fd} - }); - + START_SPAN(span, "FileRead", {{"fd", fd}}); while (total_bytes_read < nbytes) { const int64_t chunksize = @@ -1668,7 +1664,7 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { } #ifdef ARROW_WITH_OPENTELEMETRY opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); raw_span->SetAttribute("bytes_read", total_bytes_read); #endif return total_bytes_read; @@ -1677,10 +1673,7 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes) { int64_t bytes_read = 0; ::arrow::util::tracing::Span span; - START_SPAN(span, "FileReadAt", - { - {"fd", fd} - }); + START_SPAN(span, "FileReadAt", {{"fd", fd}}); while (bytes_read < nbytes) { int64_t chunksize = @@ -1700,7 +1693,7 @@ Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nb } #ifdef ARROW_WITH_OPENTELEMETRY opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); raw_span->SetAttribute("bytes_read", bytes_read); #endif return bytes_read; @@ -1713,11 +1706,7 @@ Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nb Status FileWrite(int fd, const uint8_t* buffer, const int64_t nbytes) { int64_t bytes_written = 0; ::arrow::util::tracing::Span span; - START_SPAN(span, "FileWrite", - { - {"nbytes", nbytes}, - {"fd", fd} - }); + START_SPAN(span, "FileWrite", {{"nbytes", nbytes}, {"fd", fd}}); while (bytes_written < nbytes) { const int64_t chunksize = diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index a031edf08dc..9d8e6d63531 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -72,10 +72,11 @@ AsyncGenerator WrapAsyncGenerator(AsyncGenerator wrapped, return [=]() mutable -> Future { auto span = active_span; auto scope = GetTracer()->WithActiveSpan(active_span); - auto fut = wrapped(); if (create_childspan) { span = GetTracer()->StartSpan(span_name); + scope = GetTracer()->WithActiveSpan(span); } + auto fut = wrapped(); fut.AddCallback([span](const Result& result) { MarkSpan(result.status(), span.get()); span->End(); diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 57b422e79ad..9cdee0b7763 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -291,13 +291,14 @@ class FileReaderImpl : public FileReader { {{"parquet.arrow.columnindex", i}, {"parquet.arrow.columnname", column_name}, {"parquet.arrow.physicaltype", phys_type}, - {"parquet.arrow.records_to_read", records_to_read}}); + { "parquet.arrow.records_to_read", + records_to_read }}); auto status = reader->NextBatch(records_to_read, out); uint64_t size_bytes = ::arrow::util::TotalBufferSize(*out->get()); opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); + ::arrow::internal::tracing::UnwrapSpan(span.details.get()); raw_span->SetAttribute("parquet.arrow.output_batch_size_bytes", size_bytes); return status; #else @@ -1048,7 +1049,7 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector& row_groups, RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( reader_properties_.use_threads(), static_cast(readers.size()), [&](int i) { - ::arrow::util::tracing::Span span; + ::arrow::util::tracing::Span span; START_SPAN(span, "parquet::arrow::GetRecordBatchReader::NextBatch"); return readers[i]->NextBatch(batch_size, &columns[i]); })); From a3a126d656952c918e986caec0e082d38df5d334 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 17 Mar 2023 16:55:59 +0100 Subject: [PATCH 26/46] Fixed typo in span attribute --- cpp/src/arrow/csv/reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 0204bcca07a..9782fa0625f 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -923,7 +923,7 @@ class StreamingReaderImpl : public ReaderMixin, if (result.ok()) { auto result_batch = result.ValueOrDie(); if (result_batch) { - raw_span->SetAttribute("batch.size_byte", + raw_span->SetAttribute("batch.size_bytes", ::arrow::util::TotalBufferSize(*result_batch)); } } From e1a0a0f13e105cd051ccc6a363d047f76d4f2460 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 17 Mar 2023 21:40:39 +0100 Subject: [PATCH 27/46] Added a surrogate arrow::csv::ReadNextAsync span for the first block of a fragment. This prevents the work getting lost when grouping by that span name. --- cpp/src/arrow/csv/reader.cc | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 9782fa0625f..ecf8ff2e656 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -941,8 +941,14 @@ class StreamingReaderImpl : public ReaderMixin, return Status::Invalid("Empty CSV file"); } - util::tracing::Span span; - START_SPAN(span, "arrow::csv::InitAfterFirstBuffer"); + util::tracing::Span init_span; + START_SPAN(init_span, "arrow::csv::InitAfterFirstBuffer"); + + // Create a arrow::csv::ReadNextAsync span so that grouping by that name does not ignore + // the work performed for this first block. Especially when Fragments consists of small numers of blocks, + // this fraction can be very significant (if block size == fragment size, the first block is all of the work!) + util::tracing::Span read_span; + auto scope = START_SCOPED_SPAN(read_span, "arrow::csv::ReadNextAsync"); std::shared_ptr after_header; ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed, @@ -964,11 +970,20 @@ class StreamingReaderImpl : public ReaderMixin, auto self = shared_from_this(); auto init_finished = - rb_gen().Then([self, rb_gen, max_readahead, - span = std::move(span)](const DecodedBlock& first_block) { + rb_gen().Then([self, rb_gen, max_readahead +#ifdef ARROW_WITH_OPENTELEMETRY + , init_span = std::move(init_span), read_span = std::move(read_span) +#endif + ](const DecodedBlock& first_block) { auto fut = self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); - END_SPAN(span); +#ifdef ARROW_WITH_OPENTELEMETRY + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(read_span.details.get()); + raw_span->SetAttribute("batch.size_bytes", util::TotalBufferSize(*first_block.record_batch)); +#endif + END_SPAN(read_span); + END_SPAN(init_span); return fut; }); return init_finished; From 6a9b635f1b7fb43af8f379b63356f08783889825 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 17 Mar 2023 22:57:41 +0100 Subject: [PATCH 28/46] WIP writing some more extensive documentation about tracing Acero --- docs/source/cpp/opentelemetry.rst | 76 +++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/docs/source/cpp/opentelemetry.rst b/docs/source/cpp/opentelemetry.rst index db6c0ac4469..91858d00b3d 100644 --- a/docs/source/cpp/opentelemetry.rst +++ b/docs/source/cpp/opentelemetry.rst @@ -85,3 +85,79 @@ at http://localhost:16686. Note that unlike with other methods of exporting traces, no output will be made to stdout/stderr. However, if you tail your Docker container logs, you should see output when traces are received by the all-in-one container. + +Note that the volume of spans produced by Acero can quickly become overwhelming +for many tracing frameworks. Several spans are produced per input +file, input batch, internal chunk of data (called Morsel, consisting of 128k +rows by default) and per output file (possibly also divided by columns). +In practice, this means that for each MB of data processed by Acero, it will +produce 10 - 20 spans. Choose a suitably sized dataset that strikes a balance +between being representative for the workload, but not too large to be +inspected with (or even ingested by!) a span visualizer such as Jaeger. + +Additional background on tracing +-------------------------------- +Traces produced by Acero are conceptually similar to information produced by +using profiling tools, but they are not the same. +For example, the spans by Acero do not necessarily follow the structure of the +code, like in case of the call-stacks and flame-graphs produced by profiling. +The spans aim to highlight: +- code sections that are performing significant work on the CPU +- code sections that perform I/O operations (reading/writing to disk) +- The way blocks of data flow through the execution graph +- The way data is being reorganized (e.g. a file being split into blocks) +Each span instance can have various attributes added to it when it is created. +This allows us to capture the exact size of each block of data and the amount +of time each node in the execution graph has spent on it. + +Logical/Physical spans +---------------------- +TODO: This concept needs to be defined further. For example, currently there exists spans that are not completely logical or physical: they include task submission (so they do not perform any work all the time), but also some actual work. What to do with these? (see next section) +Traces are organized in a hierarchical fashion, where each span except the root +span has parents and can have any number of children. +If a span has a child span active during its lifetime, this usually means that +this parent span is not actually in control of the CPU. Thus, calculating the +total CPU time is not as easy as adding up all of the span durations; only the +time that a span does not have any active children (this is often referred to +as the "self-time") should count. +However, Acero is a multi-threaded engine, so it is likely that there should +in fact be multiple spans performing work on a CPU at any given time! + +To model this, we have created 2 types of spans; logical spans and physical spans. +Logical spans do not perform work themselves, they are created to track the +lifetime of a certain resource (like a scheduler or a block of data). +Physical spans perform actual work. However, the self-time concept may apply, +so it is still not possible to simply aggregate all physical spans to get the +total CPU time! Aggregating all durations of top-level physical +spans (whose parent is a logical span) should give meaningful data though. (TODO: verify) + +Asynchronous behavior +--------------------- +Acero makes extensive use of asynchronous behavior. Many sections of code are +executed through a task scheduling mechanism. When these tasks are scheduled, +they can start execution immediately or some time in the future. If a span is +active during that time, it will also track this task submission time. Tracking +this can be interesting, to see if e.g. the start latency is high, but care must +be taken to not aggregate this time into actual CPU processing time. + + +Backpressure +------------ +When a node in the execution graph is receiving more data than it can process, +it can ask its preceding nodes to slow down. This process is called +"backpressure". Reasons for this can include for example: +- the buffer capacity for the node is almost full +- the maximum number of concurrently open files is reached +Relevant events such as a node applying/releasing backpressure, or an async task +group/scheduler throttling task submission, are posted as events to their +logical span (i.e. the long-running span representing the lifetime of that +scheduler/taskgroup) and can also be posted to the "local" span (that tracks +the submission of the block of data that caused the event). + + +Performing analyses on traces +----------------------------- +The durations and additional attributes of each span allows various analyses +to be performed on them. This includes: +- Calculating the average throughput of a certain + From a28a1db093c921419b88c6f16e54d832d30c628e Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 2 Jun 2023 14:15:17 +0200 Subject: [PATCH 29/46] Added helper function ATTRIBUTE_ON_CURRENT_SPAN --- cpp/src/arrow/acero/filter_node.cc | 16 +++------------- cpp/src/arrow/acero/project_node.cc | 14 +++----------- cpp/src/arrow/csv/reader.cc | 6 +----- cpp/src/arrow/util/io_util.cc | 12 ++---------- cpp/src/arrow/util/tracing_internal.h | 3 +++ cpp/src/parquet/arrow/reader.cc | 4 +--- 6 files changed, 13 insertions(+), 42 deletions(-) diff --git a/cpp/src/arrow/acero/filter_node.cc b/cpp/src/arrow/acero/filter_node.cc index 99b1d83a303..86178e81908 100644 --- a/cpp/src/arrow/acero/filter_node.cc +++ b/cpp/src/arrow/acero/filter_node.cc @@ -79,10 +79,6 @@ class FilterNode : public MapNode { {"filter.expression.simplified", simplified_filter.ToString()}, {"filter.length", batch.length}, {"input_batch.size_bytes", batch.TotalBufferSize()}}); -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); -#endif ARROW_ASSIGN_OR_RAISE( Datum mask, ExecuteScalarExpression(simplified_filter, batch, @@ -91,14 +87,10 @@ class FilterNode : public MapNode { if (mask.is_scalar()) { const auto& mask_scalar = mask.scalar_as(); if (mask_scalar.is_valid && mask_scalar.value) { -#ifdef ARROW_WITH_OPENTELEMETRY - raw_span->SetAttribute("output_batch.size_bytes", batch.TotalBufferSize()); -#endif + ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", batch.TotalBufferSize()); return batch; } -#ifdef ARROW_WITH_OPENTELEMETRY - raw_span->SetAttribute("output_batch.size_bytes", 0); -#endif + ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", 0); return batch.Slice(0, 0); } @@ -112,9 +104,7 @@ class FilterNode : public MapNode { ARROW_ASSIGN_OR_RAISE(value, Filter(value, mask, FilterOptions::Defaults())); } auto filtered_batch = ExecBatch::Make(std::move(values)); -#ifdef ARROW_WITH_OPENTELEMETRY - raw_span->SetAttribute("output_batch.size_bytes", filtered_batch->TotalBufferSize()); -#endif + ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", filtered_batch->TotalBufferSize()); return filtered_batch; } diff --git a/cpp/src/arrow/acero/project_node.cc b/cpp/src/arrow/acero/project_node.cc index 8a244b656e0..3a1da4f5d4e 100644 --- a/cpp/src/arrow/acero/project_node.cc +++ b/cpp/src/arrow/acero/project_node.cc @@ -84,13 +84,9 @@ class ProjectNode : public MapNode { {{"project.length", batch.length}, {"input_batch.size_bytes", batch.TotalBufferSize()}}); for (size_t i = 0; i < exprs_.size(); ++i) { -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); std::string project_name = "project[" + std::to_string(i) + "]"; - raw_span->SetAttribute(project_name + ".type", exprs_[i].type()->ToString()); - raw_span->SetAttribute(project_name + ".expression", exprs_[i].ToString()); -#endif + ATTRIBUTE_ON_CURRENT_SPAN(project_name + ".type", exprs_[i].type()->ToString()); + ATTRIBUTE_ON_CURRENT_SPAN(project_name + ".expression", exprs_[i].ToString()); ARROW_ASSIGN_OR_RAISE(Expression simplified_expr, SimplifyWithGuarantee(exprs_[i], batch.guarantee)); @@ -98,11 +94,7 @@ class ProjectNode : public MapNode { values[i], ExecuteScalarExpression(simplified_expr, batch, plan()->query_context()->exec_context())); } -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); - raw_span->SetAttribute("output_batch.size_bytes", batch.TotalBufferSize()); -#endif + ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", batch.TotalBufferSize()); return ExecBatch{std::move(values), batch.length}; } diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index ecf8ff2e656..77fb67b94f5 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -435,11 +435,7 @@ class BlockParsingOperator { num_rows_seen_ += parser->total_num_rows(); } RETURN_NOT_OK(block.consume_bytes(parsed_size)); -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); - raw_span->SetAttribute("parsed_size", parsed_size); -#endif + ATTRIBUTE_ON_CURRENT_SPAN("parsed_size", parsed_size); return ParsedBlock{std::move(parser), block.block_index, static_cast(parsed_size) + block.bytes_skipped}; } diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 890f1f1a4f0..eea29294918 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1663,11 +1663,7 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { buffer += bytes_read; total_bytes_read += bytes_read; } -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); - raw_span->SetAttribute("bytes_read", total_bytes_read); -#endif + ATTRIBUTE_ON_CURRENT_SPAN("bytes_read", total_bytes_read); return total_bytes_read; } @@ -1692,11 +1688,7 @@ Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nb position += ret; bytes_read += ret; } -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); - raw_span->SetAttribute("bytes_read", bytes_read); -#endif + ATTRIBUTE_ON_CURRENT_SPAN("bytes_read", bytes_read); return bytes_read; } diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 9d8e6d63531..aebb6688afe 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -179,6 +179,9 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( #define EVENT_ON_CURRENT_SPAN(...) \ ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()->AddEvent(__VA_ARGS__) +#define ATTRIBUTE_ON_CURRENT_SPAN(...) \ + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()->SetAttribute(__VA_ARGS__) + #define EVENT(target_span, ...) \ ::arrow::internal::tracing::UnwrapSpan(target_span.details.get())->AddEvent(__VA_ARGS__) diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 9cdee0b7763..7c08d055e62 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -297,9 +297,7 @@ class FileReaderImpl : public FileReader { auto status = reader->NextBatch(records_to_read, out); uint64_t size_bytes = ::arrow::util::TotalBufferSize(*out->get()); - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); - raw_span->SetAttribute("parquet.arrow.output_batch_size_bytes", size_bytes); + ATTRIBUTE_ON_CURRENT_SPAN("parquet.arrow.output_batch_size_bytes", size_bytes); return status; #else return reader->NextBatch(records_to_read, out); From 212967886e7c9f47116887cc0f0b82f5d5ff63ab Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 5 Jun 2023 14:16:05 +0200 Subject: [PATCH 30/46] Reverted commenting out task submission tracing --- cpp/src/arrow/util/async_util.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 278aeb2c047..f8c2111813b 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -194,7 +194,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { if (!submit_result->TryAddCallback([this, task_inner = std::move(task)]() mutable { return [this, task_inner2 = std::move(task_inner)](const Status& st) { #ifdef ARROW_WITH_OPENTELEMETRY - // TraceTaskFinished(task_inner2.get()); + TraceTaskFinished(task_inner2.get()); #endif OnTaskFinished(st); }; @@ -245,7 +245,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { // It's important that the task's span be active while we run the submit function. // Normally the submit function should transfer the span to the thread task as the // active span. - // auto scope = TraceTaskSubmitted(task.get(), span_); + auto scope = TraceTaskSubmitted(task.get(), span_); #endif running_tasks_++; lk.unlock(); From 06bcc1e1d4227c001fe76f718db37d5639d35213 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 5 Jun 2023 14:16:16 +0200 Subject: [PATCH 31/46] Fix typo --- cpp/src/arrow/ipc/writer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index ca232260059..a87485e69be 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -229,7 +229,7 @@ class RecordBatchSerializer { auto CompressOne = [&](size_t i) { if (out_->body_buffers[i]->size() > 0) { ::arrow::util::tracing::Span span; - START_SPAN(span, "arrow::ipc::DecompressBuffers", + START_SPAN(span, "arrow::ipc::CompressBuffers", {{"buffer_index", i}, {"ipc.compression.codec", options_.codec.get()->name().c_str()}, {"ipc.options.use_threads", options_.use_threads}, From c8fa9f6bc6c0a587925b4d2d24cdffd52391a658 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 5 Jun 2023 14:18:07 +0200 Subject: [PATCH 32/46] Using helper function in some more places --- cpp/src/arrow/dataset/dataset_writer.cc | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index da3071d0e31..937ec026aa3 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -197,12 +197,8 @@ class DatasetWriterFileQueue { ARROW_ASSIGN_OR_RAISE(std::shared_ptr next_batch, PopStagedBatch()); int64_t rows_popped = next_batch->num_rows(); rows_currently_staged_ -= next_batch->num_rows(); -#ifdef ARROW_WITH_OPENTELEMETRY - ::arrow::internal::tracing::UnwrapSpan(span.details.get()) - ->SetAttribute("batch.size_rows", next_batch->num_rows()); - ::arrow::internal::tracing::UnwrapSpan(span.details.get()) - ->SetAttribute("rows_currently_staged", rows_currently_staged_); -#endif + ATTRIBUTE_ON_CURRENT_SPAN("batch.size_rows", next_batch->num_rows()); + ATTRIBUTE_ON_CURRENT_SPAN("rows_currently_staged", rows_currently_staged_); ScheduleBatch(std::move(next_batch)); return rows_popped; } From d31526452dfffe9f297040ea71bf88c95caa8b6e Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 11 Jul 2023 14:26:32 +0200 Subject: [PATCH 33/46] Removed some unused code from the datasetwriter --- cpp/src/arrow/dataset/dataset_writer.cc | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 937ec026aa3..03d155333e2 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -289,11 +289,6 @@ class DatasetWriterFileQueue { util::AsyncTaskScheduler* file_tasks_ = nullptr; }; -struct WriteTask { - std::string filename; - uint64_t num_rows; -}; - class DatasetWriterDirectoryQueue { public: DatasetWriterDirectoryQueue(util::AsyncTaskScheduler* scheduler, std::string directory, @@ -329,7 +324,6 @@ class DatasetWriterDirectoryQueue { Status StartWrite(const std::shared_ptr& batch) { rows_written_ += batch->num_rows(); - WriteTask task{current_filename_, static_cast(batch->num_rows())}; if (!latest_open_file_) { ARROW_RETURN_NOT_OK(OpenFileQueue(current_filename_)); } From 407bcb2704cff1746be67e8450444e4181114067 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 11 Jul 2023 17:57:13 +0200 Subject: [PATCH 34/46] Propagating span context to SimpleTask submission functions --- cpp/src/arrow/util/async_util.h | 52 ++++++++++++++++++++------- cpp/src/arrow/util/tracing_internal.h | 12 +++++-- 2 files changed, 48 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index 2668ae22260..2e76abff089 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -30,7 +30,7 @@ #include "arrow/util/iterator.h" #include "arrow/util/mutex.h" #include "arrow/util/thread_pool.h" -#include "arrow/util/tracing.h" +#include "arrow/util/tracing_internal.h" namespace arrow { @@ -142,17 +142,16 @@ class ARROW_EXPORT AsyncTaskScheduler { bool AddAsyncGenerator(std::function()> generator, std::function visitor, std::string_view name); - template struct SimpleTask : public Task { - SimpleTask(Callable callable, std::string_view name) + SimpleTask(FnOnce>()> callable, std::string_view name) : callable(std::move(callable)), name_(name) {} - SimpleTask(Callable callable, std::string name) + SimpleTask(FnOnce>()> callable, std::string name) : callable(std::move(callable)), owned_name_(std::move(name)) { name_ = *owned_name_; } - Result> operator()() override { return callable(); } + Result> operator()() override { return std::move(callable)(); } std::string_view name() const override { return name_; } - Callable callable; + FnOnce>()> callable; std::string_view name_; std::optional owned_name_; }; @@ -166,19 +165,46 @@ class ARROW_EXPORT AsyncTaskScheduler { /// future completes. It is used for debugging and tracing. /// /// \see AddTask for more details - template - bool AddSimpleTask(Callable callable, std::string_view name) { - return AddTask(std::make_unique>(std::move(callable), name)); + bool AddSimpleTask(FnOnce>()> callable, std::string_view name) { +#ifdef ARROW_WITH_OPENTELEMETRY + // Wrap the task to propagate a parent tracing span to it + struct SpanWrapper { + Result> operator()() { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); + return std::move(func)(); + } + FnOnce>()> func; + opentelemetry::nostd::shared_ptr active_span; + }; + SpanWrapper wrapper{std::move(callable), + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + return AddTask(std::make_unique(std::move(wrapper), name)); +#else + return AddTask(std::make_unique(std::move(callable), name)); +#endif } /// Add a task with cost 1 to the scheduler /// /// This is an overload of \see AddSimpleTask that keeps `name` alive /// in the task. - template - bool AddSimpleTask(Callable callable, std::string name) { - return AddTask( - std::make_unique>(std::move(callable), std::move(name))); + bool AddSimpleTask(FnOnce>()> callable, std::string name) { +#ifdef ARROW_WITH_OPENTELEMETRY + // Wrap the task to propagate a parent tracing span to it + struct SpanWrapper { + Result> operator()() { + auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); + return std::move(func)(); + } + FnOnce>()> func; + opentelemetry::nostd::shared_ptr active_span; + }; + SpanWrapper wrapper{std::move(callable), + ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; + return AddTask(std::make_unique(std::move(wrapper), std::move(name))); +#else + return AddTask(std::make_unique(std::move(callable), std::move(name))); +#endif } /// Construct a scheduler diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index aebb6688afe..04b96af24df 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -21,6 +21,7 @@ // Pick up ARROW_WITH_OPENTELEMETRY first #include "arrow/util/config.h" +#include "arrow/util/visibility.h" #ifdef ARROW_WITH_OPENTELEMETRY #ifdef _MSC_VER @@ -29,6 +30,14 @@ #endif #include #include +namespace arrow { +namespace internal { +namespace tracing { + ARROW_EXPORT + opentelemetry::trace::Tracer *GetTracer(); +} +} +} #ifdef _MSC_VER #pragma warning(pop) #endif @@ -38,15 +47,12 @@ #include "arrow/util/async_generator.h" #include "arrow/util/iterator.h" #include "arrow/util/tracing.h" -#include "arrow/util/visibility.h" namespace arrow { namespace internal { namespace tracing { #ifdef ARROW_WITH_OPENTELEMETRY -ARROW_EXPORT -opentelemetry::trace::Tracer* GetTracer(); inline void MarkSpan(const Status& s, opentelemetry::trace::Span* span) { if (!s.ok()) { From ccdf2ec487aa3393af5a1b0b3838e2107a12b699 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 12 Jul 2023 17:29:19 +0200 Subject: [PATCH 35/46] Removed TODO, some typos --- cpp/src/arrow/acero/source_node.cc | 8 +------- cpp/src/arrow/csv/reader.cc | 3 +-- cpp/src/arrow/ipc/reader.cc | 2 +- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index 8520bf9f3a6..236a3a528f8 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -212,13 +212,7 @@ struct SourceNode : ExecNode, public TracedNode { SliceAndDeliverMorsel(*morsel_or_end); lock.lock(); if (!backpressure_future_.is_finished()) { - EVENT_ON_CURRENT_SPAN( - "SourceNode::BackpressureApplied"); // TODO: This should probably be - // posted to the - // SourceNode::DatasetScan span but - // we may need to do that manually - // because we don't know if that - // span is active + EVENT_ON_CURRENT_SPAN("SourceNode::BackpressureApplied"); return backpressure_future_.Then( []() -> ControlFlow { return Continue(); }); } diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 77fb67b94f5..6e609e5c853 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -941,8 +941,7 @@ class StreamingReaderImpl : public ReaderMixin, START_SPAN(init_span, "arrow::csv::InitAfterFirstBuffer"); // Create a arrow::csv::ReadNextAsync span so that grouping by that name does not ignore - // the work performed for this first block. Especially when Fragments consists of small numers of blocks, - // this fraction can be very significant (if block size == fragment size, the first block is all of the work!) + // the work performed for this first block. util::tracing::Span read_span; auto scope = START_SCOPED_SPAN(read_span, "arrow::csv::ReadNextAsync"); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 900a428ed4f..5dfaaed7509 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -515,7 +515,7 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op return ::arrow::internal::OptionalParallelFor( options.use_threads, static_cast(buffers.size()), [&](int i) { ::arrow::util::tracing::Span span; - START_SPAN(span, "arrow::ipc::DecompressBuffers", + START_SPAN(span, "arrow::ipc::DecompressBuffer", {{"buffer_index", i}, {"ipc.compression.codec", codec.get()->name().c_str()}, {"ipc.options.use_threads", options.use_threads}, From 45f03faea9a6bb822e1ee706e00d64b17b81f29d Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 12 Jul 2023 17:50:46 +0200 Subject: [PATCH 36/46] Formatting --- cpp/src/arrow/acero/filter_node.cc | 3 ++- cpp/src/arrow/csv/reader.cc | 31 ++++++++++++++------------- cpp/src/arrow/util/async_util.h | 24 ++++++++++----------- cpp/src/arrow/util/tracing_internal.h | 10 ++++----- 4 files changed, 35 insertions(+), 33 deletions(-) diff --git a/cpp/src/arrow/acero/filter_node.cc b/cpp/src/arrow/acero/filter_node.cc index 86178e81908..1414f6108f4 100644 --- a/cpp/src/arrow/acero/filter_node.cc +++ b/cpp/src/arrow/acero/filter_node.cc @@ -104,7 +104,8 @@ class FilterNode : public MapNode { ARROW_ASSIGN_OR_RAISE(value, Filter(value, mask, FilterOptions::Defaults())); } auto filtered_batch = ExecBatch::Make(std::move(values)); - ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", filtered_batch->TotalBufferSize()); + ATTRIBUTE_ON_CURRENT_SPAN("output_batch.size_bytes", + filtered_batch->TotalBufferSize()); return filtered_batch; } diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 6e609e5c853..4e74ac5a094 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -940,8 +940,8 @@ class StreamingReaderImpl : public ReaderMixin, util::tracing::Span init_span; START_SPAN(init_span, "arrow::csv::InitAfterFirstBuffer"); - // Create a arrow::csv::ReadNextAsync span so that grouping by that name does not ignore - // the work performed for this first block. + // Create a arrow::csv::ReadNextAsync span so that grouping by that name does not + // ignore the work performed for this first block. util::tracing::Span read_span; auto scope = START_SCOPED_SPAN(read_span, "arrow::csv::ReadNextAsync"); @@ -964,23 +964,24 @@ class StreamingReaderImpl : public ReaderMixin, auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op)); auto self = shared_from_this(); - auto init_finished = - rb_gen().Then([self, rb_gen, max_readahead + auto init_finished = rb_gen().Then([self, rb_gen, max_readahead #ifdef ARROW_WITH_OPENTELEMETRY - , init_span = std::move(init_span), read_span = std::move(read_span) + , + init_span = std::move(init_span), + read_span = std::move(read_span) #endif - ](const DecodedBlock& first_block) { - auto fut = - self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); + ](const DecodedBlock& first_block) { + auto fut = self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); #ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(read_span.details.get()); - raw_span->SetAttribute("batch.size_bytes", util::TotalBufferSize(*first_block.record_batch)); + opentelemetry::nostd::shared_ptr raw_span = + ::arrow::internal::tracing::UnwrapSpan(read_span.details.get()); + raw_span->SetAttribute("batch.size_bytes", + util::TotalBufferSize(*first_block.record_batch)); #endif - END_SPAN(read_span); - END_SPAN(init_span); - return fut; - }); + END_SPAN(read_span); + END_SPAN(init_span); + return fut; + }); return init_finished; } diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index 2e76abff089..b6e3fae7bdb 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -169,12 +169,12 @@ class ARROW_EXPORT AsyncTaskScheduler { #ifdef ARROW_WITH_OPENTELEMETRY // Wrap the task to propagate a parent tracing span to it struct SpanWrapper { - Result> operator()() { - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); - return std::move(func)(); - } - FnOnce>()> func; - opentelemetry::nostd::shared_ptr active_span; + Result> operator()() { + auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); + return std::move(func)(); + } + FnOnce>()> func; + opentelemetry::nostd::shared_ptr active_span; }; SpanWrapper wrapper{std::move(callable), ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; @@ -192,12 +192,12 @@ class ARROW_EXPORT AsyncTaskScheduler { #ifdef ARROW_WITH_OPENTELEMETRY // Wrap the task to propagate a parent tracing span to it struct SpanWrapper { - Result> operator()() { - auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); - return std::move(func)(); - } - FnOnce>()> func; - opentelemetry::nostd::shared_ptr active_span; + Result> operator()() { + auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); + return std::move(func)(); + } + FnOnce>()> func; + opentelemetry::nostd::shared_ptr active_span; }; SpanWrapper wrapper{std::move(callable), ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 04b96af24df..6025517823b 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -33,11 +33,11 @@ namespace arrow { namespace internal { namespace tracing { - ARROW_EXPORT - opentelemetry::trace::Tracer *GetTracer(); -} -} -} +ARROW_EXPORT +opentelemetry::trace::Tracer* GetTracer(); +} // namespace tracing +} // namespace internal +} // namespace arrow #ifdef _MSC_VER #pragma warning(pop) #endif From 42fc7b685296d399b38e93d84bd0350067f28bbd Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 13 Jul 2023 16:07:57 +0200 Subject: [PATCH 37/46] Added missing empty macro for ATTRIBUTE_ON_CURRENT_SPAN() --- cpp/src/arrow/util/tracing_internal.h | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 6025517823b..503d4e9a1a8 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -241,6 +241,7 @@ struct Scope { #define MARK_SPAN(target_span, status) #define EVENT(target_span, ...) #define EVENT_ON_CURRENT_SPAN(...) +#define ATTRIBUTE_ON_CURRENT_SPAN(...) #define END_SPAN(target_span) #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future) #define PROPAGATE_SPAN_TO_GENERATOR(generator) From b64ca22a19699cb7b5fe455b19a399efb12ccd03 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 11 Jul 2023 17:57:13 +0200 Subject: [PATCH 38/46] Revert "Propagating span context to SimpleTask submission functions" This reverts commit 407bcb2704cff1746be67e8450444e4181114067. # Conflicts: # cpp/src/arrow/util/async_util.h # cpp/src/arrow/util/tracing_internal.h --- cpp/src/arrow/util/async_util.h | 52 +++++++-------------------- cpp/src/arrow/util/tracing_internal.h | 12 ++----- 2 files changed, 16 insertions(+), 48 deletions(-) diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h index b6e3fae7bdb..2668ae22260 100644 --- a/cpp/src/arrow/util/async_util.h +++ b/cpp/src/arrow/util/async_util.h @@ -30,7 +30,7 @@ #include "arrow/util/iterator.h" #include "arrow/util/mutex.h" #include "arrow/util/thread_pool.h" -#include "arrow/util/tracing_internal.h" +#include "arrow/util/tracing.h" namespace arrow { @@ -142,16 +142,17 @@ class ARROW_EXPORT AsyncTaskScheduler { bool AddAsyncGenerator(std::function()> generator, std::function visitor, std::string_view name); + template struct SimpleTask : public Task { - SimpleTask(FnOnce>()> callable, std::string_view name) + SimpleTask(Callable callable, std::string_view name) : callable(std::move(callable)), name_(name) {} - SimpleTask(FnOnce>()> callable, std::string name) + SimpleTask(Callable callable, std::string name) : callable(std::move(callable)), owned_name_(std::move(name)) { name_ = *owned_name_; } - Result> operator()() override { return std::move(callable)(); } + Result> operator()() override { return callable(); } std::string_view name() const override { return name_; } - FnOnce>()> callable; + Callable callable; std::string_view name_; std::optional owned_name_; }; @@ -165,46 +166,19 @@ class ARROW_EXPORT AsyncTaskScheduler { /// future completes. It is used for debugging and tracing. /// /// \see AddTask for more details - bool AddSimpleTask(FnOnce>()> callable, std::string_view name) { -#ifdef ARROW_WITH_OPENTELEMETRY - // Wrap the task to propagate a parent tracing span to it - struct SpanWrapper { - Result> operator()() { - auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); - return std::move(func)(); - } - FnOnce>()> func; - opentelemetry::nostd::shared_ptr active_span; - }; - SpanWrapper wrapper{std::move(callable), - ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; - return AddTask(std::make_unique(std::move(wrapper), name)); -#else - return AddTask(std::make_unique(std::move(callable), name)); -#endif + template + bool AddSimpleTask(Callable callable, std::string_view name) { + return AddTask(std::make_unique>(std::move(callable), name)); } /// Add a task with cost 1 to the scheduler /// /// This is an overload of \see AddSimpleTask that keeps `name` alive /// in the task. - bool AddSimpleTask(FnOnce>()> callable, std::string name) { -#ifdef ARROW_WITH_OPENTELEMETRY - // Wrap the task to propagate a parent tracing span to it - struct SpanWrapper { - Result> operator()() { - auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span); - return std::move(func)(); - } - FnOnce>()> func; - opentelemetry::nostd::shared_ptr active_span; - }; - SpanWrapper wrapper{std::move(callable), - ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; - return AddTask(std::make_unique(std::move(wrapper), std::move(name))); -#else - return AddTask(std::make_unique(std::move(callable), std::move(name))); -#endif + template + bool AddSimpleTask(Callable callable, std::string name) { + return AddTask( + std::make_unique>(std::move(callable), std::move(name))); } /// Construct a scheduler diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 503d4e9a1a8..d21ecb9d751 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -21,7 +21,6 @@ // Pick up ARROW_WITH_OPENTELEMETRY first #include "arrow/util/config.h" -#include "arrow/util/visibility.h" #ifdef ARROW_WITH_OPENTELEMETRY #ifdef _MSC_VER @@ -30,14 +29,6 @@ #endif #include #include -namespace arrow { -namespace internal { -namespace tracing { -ARROW_EXPORT -opentelemetry::trace::Tracer* GetTracer(); -} // namespace tracing -} // namespace internal -} // namespace arrow #ifdef _MSC_VER #pragma warning(pop) #endif @@ -47,12 +38,15 @@ opentelemetry::trace::Tracer* GetTracer(); #include "arrow/util/async_generator.h" #include "arrow/util/iterator.h" #include "arrow/util/tracing.h" +#include "arrow/util/visibility.h" namespace arrow { namespace internal { namespace tracing { #ifdef ARROW_WITH_OPENTELEMETRY +ARROW_EXPORT +opentelemetry::trace::Tracer* GetTracer(); inline void MarkSpan(const Status& s, opentelemetry::trace::Span* span) { if (!s.ok()) { From 898b09361e51b61a8ac5ddc842af223ebc05768d Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 14 Jul 2023 14:36:35 +0200 Subject: [PATCH 39/46] Propagating parent span through task submission --- cpp/src/arrow/util/async_util.cc | 36 ++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index f8c2111813b..fdd8385ab91 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -159,6 +159,22 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { if (IsAborted()) { return false; } +#ifdef ARROW_WITH_OPENTELEMETRY + // Wrap the task to propagate a parent tracing span to it + struct WrapperTask : public Task { + WrapperTask(std::unique_ptr target, opentelemetry::nostd::shared_ptr parent_span) + : target(std::move(target)), parent_span(std::move(parent_span)) {} + Result> operator()() override { + auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(parent_span); + return (*target)(); + } + int cost() const override { return target->cost(); } + std::string_view name() const override { return target->name(); } + std::unique_ptr target; + opentelemetry::nostd::shared_ptr parent_span; + }; + task = std::make_unique(std::move(task), arrow::internal::tracing::GetTracer()->GetCurrentSpan()); +#endif SubmitTaskUnlocked(std::move(task), std::move(lk)); return true; } @@ -194,7 +210,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { if (!submit_result->TryAddCallback([this, task_inner = std::move(task)]() mutable { return [this, task_inner2 = std::move(task_inner)](const Status& st) { #ifdef ARROW_WITH_OPENTELEMETRY - TraceTaskFinished(task_inner2.get()); + //TraceTaskFinished(task_inner2.get()); #endif OnTaskFinished(st); }; @@ -245,7 +261,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { // It's important that the task's span be active while we run the submit function. // Normally the submit function should transfer the span to the thread task as the // active span. - auto scope = TraceTaskSubmitted(task.get(), span_); +// auto scope = TraceTaskSubmitted(task.get(), span_); #endif running_tasks_++; lk.unlock(); @@ -285,6 +301,22 @@ class ThrottledAsyncTaskSchedulerImpl } bool AddTask(std::unique_ptr task) override { +#ifdef ARROW_WITH_OPENTELEMETRY + // Wrap the task to propagate a parent tracing span to it + struct WrapperTask : public Task { + WrapperTask(std::unique_ptr target, opentelemetry::nostd::shared_ptr parent_span) + : target(std::move(target)), parent_span(std::move(parent_span)) {} + Result> operator()() override { + auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(parent_span); + return (*target)(); + } + int cost() const override { return target->cost(); } + std::string_view name() const override { return target->name(); } + std::unique_ptr target; + opentelemetry::nostd::shared_ptr parent_span; + }; + task = std::make_unique(std::move(task), arrow::internal::tracing::GetTracer()->GetCurrentSpan()); +#endif std::unique_lock lk(mutex_); // If the queue isn't empty then don't even try and acquire the throttle // We can safely assume it is either blocked or in the middle of trying to From 78ea49712be46933cb8d47e12a34dff59946218f Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 14 Jul 2023 14:49:02 +0200 Subject: [PATCH 40/46] Increased queue size for tracing to avoid dropping spans --- cpp/src/arrow/util/tracing_internal.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 58668cab18b..583921ce0f5 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -156,9 +156,9 @@ nostd::shared_ptr InitializeSdkTracerProvider() { auto exporter = InitializeExporter(); if (exporter) { sdktrace::BatchSpanProcessorOptions options; - options.max_queue_size = 16384; + options.max_queue_size = 65536; options.schedule_delay_millis = std::chrono::milliseconds(500); - options.max_export_batch_size = 16384; + options.max_export_batch_size = 65536; auto processor = std::make_unique(std::move(exporter), options); return std::make_shared(std::move(processor)); From 4028cd616d66cd1a53fcb765453edc18cefcc143 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 14 Jul 2023 21:48:00 +0200 Subject: [PATCH 41/46] Removed task submission trace functions --- cpp/src/arrow/util/async_util.cc | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index fdd8385ab91..7c0c97e75d1 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -122,19 +122,6 @@ class FifoQueue : public ThrottledAsyncTaskScheduler::Queue { std::list> tasks_; }; -#ifdef ARROW_WITH_OPENTELEMETRY -::arrow::internal::tracing::Scope TraceTaskSubmitted(AsyncTaskScheduler::Task* task, - const util::tracing::Span& parent) { - return START_SCOPED_SPAN_WITH_PARENT_SV( - task->span, parent, task->name(), - {{"task.cost", task->cost()}, {"span.type", "AsyncTask"}}); -} - -void TraceTaskFinished(AsyncTaskScheduler::Task* task) { END_SPAN(task->span); } - -void TraceSchedulerAbort(const Status& error) { EVENT_ON_CURRENT_SPAN(error.ToString()); } -#endif - class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { public: using Task = AsyncTaskScheduler::Task; @@ -209,9 +196,6 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { // Capture `task` to keep it alive until finished if (!submit_result->TryAddCallback([this, task_inner = std::move(task)]() mutable { return [this, task_inner2 = std::move(task_inner)](const Status& st) { -#ifdef ARROW_WITH_OPENTELEMETRY - //TraceTaskFinished(task_inner2.get()); -#endif OnTaskFinished(st); }; })) { @@ -235,7 +219,8 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { if (!IsAborted()) { maybe_error_ = st; #ifdef ARROW_WITH_OPENTELEMETRY - TraceSchedulerAbort(st); + EVENT(span(), "Task aborted", + {{"Error", st.ToString()}}); #endif // Add one more "task" to represent running the abort callback. This // will prevent any other task finishing and marking the scheduler finished @@ -257,12 +242,6 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { AbortUnlocked(stop_token_.Poll(), std::move(lk)); return; } -#ifdef ARROW_WITH_OPENTELEMETRY - // It's important that the task's span be active while we run the submit function. - // Normally the submit function should transfer the span to the thread task as the - // active span. -// auto scope = TraceTaskSubmitted(task.get(), span_); -#endif running_tasks_++; lk.unlock(); return DoSubmitTask(std::move(task)); From 19522e5692ea4ce5d778f2f2e69cd933c255abc8 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 14 Jul 2023 21:49:24 +0200 Subject: [PATCH 42/46] Formatting --- cpp/src/arrow/util/async_util.cc | 53 +++++++++++++++++--------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc index 7c0c97e75d1..df84b53e82c 100644 --- a/cpp/src/arrow/util/async_util.cc +++ b/cpp/src/arrow/util/async_util.cc @@ -149,18 +149,21 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { #ifdef ARROW_WITH_OPENTELEMETRY // Wrap the task to propagate a parent tracing span to it struct WrapperTask : public Task { - WrapperTask(std::unique_ptr target, opentelemetry::nostd::shared_ptr parent_span) - : target(std::move(target)), parent_span(std::move(parent_span)) {} - Result> operator()() override { - auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(parent_span); - return (*target)(); - } - int cost() const override { return target->cost(); } - std::string_view name() const override { return target->name(); } - std::unique_ptr target; - opentelemetry::nostd::shared_ptr parent_span; + WrapperTask( + std::unique_ptr target, + opentelemetry::nostd::shared_ptr parent_span) + : target(std::move(target)), parent_span(std::move(parent_span)) {} + Result> operator()() override { + auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(parent_span); + return (*target)(); + } + int cost() const override { return target->cost(); } + std::string_view name() const override { return target->name(); } + std::unique_ptr target; + opentelemetry::nostd::shared_ptr parent_span; }; - task = std::make_unique(std::move(task), arrow::internal::tracing::GetTracer()->GetCurrentSpan()); + task = std::make_unique( + std::move(task), arrow::internal::tracing::GetTracer()->GetCurrentSpan()); #endif SubmitTaskUnlocked(std::move(task), std::move(lk)); return true; @@ -219,8 +222,7 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler { if (!IsAborted()) { maybe_error_ = st; #ifdef ARROW_WITH_OPENTELEMETRY - EVENT(span(), "Task aborted", - {{"Error", st.ToString()}}); + EVENT(span(), "Task aborted", {{"Error", st.ToString()}}); #endif // Add one more "task" to represent running the abort callback. This // will prevent any other task finishing and marking the scheduler finished @@ -283,18 +285,21 @@ class ThrottledAsyncTaskSchedulerImpl #ifdef ARROW_WITH_OPENTELEMETRY // Wrap the task to propagate a parent tracing span to it struct WrapperTask : public Task { - WrapperTask(std::unique_ptr target, opentelemetry::nostd::shared_ptr parent_span) - : target(std::move(target)), parent_span(std::move(parent_span)) {} - Result> operator()() override { - auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(parent_span); - return (*target)(); - } - int cost() const override { return target->cost(); } - std::string_view name() const override { return target->name(); } - std::unique_ptr target; - opentelemetry::nostd::shared_ptr parent_span; + WrapperTask( + std::unique_ptr target, + opentelemetry::nostd::shared_ptr parent_span) + : target(std::move(target)), parent_span(std::move(parent_span)) {} + Result> operator()() override { + auto scope = arrow::internal::tracing::GetTracer()->WithActiveSpan(parent_span); + return (*target)(); + } + int cost() const override { return target->cost(); } + std::string_view name() const override { return target->name(); } + std::unique_ptr target; + opentelemetry::nostd::shared_ptr parent_span; }; - task = std::make_unique(std::move(task), arrow::internal::tracing::GetTracer()->GetCurrentSpan()); + task = std::make_unique( + std::move(task), arrow::internal::tracing::GetTracer()->GetCurrentSpan()); #endif std::unique_lock lk(mutex_); // If the queue isn't empty then don't even try and acquire the throttle From b9df0bea318ad86f2d5eeac6440be1a94825d192 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 17 Jul 2023 15:41:12 +0200 Subject: [PATCH 43/46] Updated some namespace usages --- cpp/src/arrow/acero/project_node.cc | 2 +- cpp/src/arrow/acero/source_node.cc | 4 ++-- cpp/src/arrow/compute/function.cc | 2 +- cpp/src/arrow/ipc/reader.cc | 2 +- cpp/src/arrow/ipc/writer.cc | 2 +- cpp/src/arrow/util/io_util.cc | 6 +++--- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/acero/project_node.cc b/cpp/src/arrow/acero/project_node.cc index 3a1da4f5d4e..d69524fca6d 100644 --- a/cpp/src/arrow/acero/project_node.cc +++ b/cpp/src/arrow/acero/project_node.cc @@ -79,7 +79,7 @@ class ProjectNode : public MapNode { Result ProcessBatch(ExecBatch batch) override { std::vector values{exprs_.size()}; - util::tracing::Span span; + arrow::util::tracing::Span span; START_COMPUTE_SPAN(span, "Project", {{"project.length", batch.length}, {"input_batch.size_bytes", batch.TotalBufferSize()}}); diff --git a/cpp/src/arrow/acero/source_node.cc b/cpp/src/arrow/acero/source_node.cc index d380e1b8bb7..b4634ba2290 100644 --- a/cpp/src/arrow/acero/source_node.cc +++ b/cpp/src/arrow/acero/source_node.cc @@ -133,7 +133,7 @@ struct SourceNode : ExecNode, public TracedNode { plan_->query_context()->ScheduleTask( [this, morsel_length, use_legacy_batching, initial_batch_index, morsel, has_ordering = !ordering_.is_unordered()]() { - util::tracing::Span span; + arrow::util::tracing::Span span; START_SPAN(span, "SourceNode::ProcessMorsel"); int64_t offset = 0; int batch_index = initial_batch_index; @@ -187,7 +187,7 @@ struct SourceNode : ExecNode, public TracedNode { options.should_schedule = ShouldSchedule::IfDifferentExecutor; ARROW_ASSIGN_OR_RAISE(Future<> scan_task, plan_->query_context()->BeginExternalTask( "SourceNode::DatasetScan")); - util::tracing::Span span; + arrow::util::tracing::Span span; START_SPAN(span, "SourceNode::DatasetScan"); if (!scan_task.is_valid()) { diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index c0433145dd1..7ec5123c073 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -219,7 +219,7 @@ struct FunctionExecutorImpl : public FunctionExecutor { } Result Execute(const std::vector& args, int64_t passed_length) override { - util::tracing::Span span; + arrow::util::tracing::Span span; auto func_kind = func.kind(); const auto& func_name = func.name(); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 841103d3b2f..4d696fd0c19 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -514,7 +514,7 @@ Status DecompressBuffers(Compression::type compression, const IpcReadOptions& op return ::arrow::internal::OptionalParallelFor( options.use_threads, static_cast(buffers.size()), [&](int i) { - ::arrow::util::tracing::Span span; + util::tracing::Span span; START_SPAN(span, "arrow::ipc::DecompressBuffer", {{"buffer_index", i}, {"ipc.compression.codec", codec.get()->name().c_str()}, diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 9e123ffc16f..32e17ab726d 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -233,7 +233,7 @@ class RecordBatchSerializer { auto CompressOne = [&](size_t i) { if (out_->body_buffers[i]->size() > 0) { - ::arrow::util::tracing::Span span; + util::tracing::Span span; START_SPAN(span, "arrow::ipc::CompressBuffers", {{"buffer_index", i}, {"ipc.compression.codec", options_.codec.get()->name().c_str()}, diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 72c01154c2b..3429fba28e3 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1629,7 +1629,7 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { HANDLE handle = reinterpret_cast(_get_osfhandle(fd)); #endif int64_t total_bytes_read = 0; - ::arrow::util::tracing::Span span; + util::tracing::Span span; START_SPAN(span, "FileRead", {{"fd", fd}}); while (total_bytes_read < nbytes) { @@ -1670,7 +1670,7 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes) { int64_t bytes_read = 0; - ::arrow::util::tracing::Span span; + util::tracing::Span span; START_SPAN(span, "FileReadAt", {{"fd", fd}}); while (bytes_read < nbytes) { @@ -1699,7 +1699,7 @@ Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nb Status FileWrite(int fd, const uint8_t* buffer, const int64_t nbytes) { int64_t bytes_written = 0; - ::arrow::util::tracing::Span span; + util::tracing::Span span; START_SPAN(span, "FileWrite", {{"nbytes", nbytes}, {"fd", fd}}); while (bytes_written < nbytes) { From 07d501226ad0c13360df5b4bfe1c3ea839ccccfd Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 17 Jul 2023 16:33:12 +0200 Subject: [PATCH 44/46] Wrote some documentation about the structure of traces --- docs/source/cpp/opentelemetry.rst | 71 +++++++++++++++++-------------- 1 file changed, 40 insertions(+), 31 deletions(-) diff --git a/docs/source/cpp/opentelemetry.rst b/docs/source/cpp/opentelemetry.rst index 91858d00b3d..b8eca0f4b79 100644 --- a/docs/source/cpp/opentelemetry.rst +++ b/docs/source/cpp/opentelemetry.rst @@ -92,8 +92,8 @@ file, input batch, internal chunk of data (called Morsel, consisting of 128k rows by default) and per output file (possibly also divided by columns). In practice, this means that for each MB of data processed by Acero, it will produce 10 - 20 spans. Choose a suitably sized dataset that strikes a balance -between being representative for the workload, but not too large to be -inspected with (or even ingested by!) a span visualizer such as Jaeger. +between being representative for the real-world workload, but not too large to +be inspected with (or even ingested by!) a span visualizer such as Jaeger. Additional background on tracing -------------------------------- @@ -110,9 +110,8 @@ Each span instance can have various attributes added to it when it is created. This allows us to capture the exact size of each block of data and the amount of time each node in the execution graph has spent on it. -Logical/Physical spans +Span hierarchy ---------------------- -TODO: This concept needs to be defined further. For example, currently there exists spans that are not completely logical or physical: they include task submission (so they do not perform any work all the time), but also some actual work. What to do with these? (see next section) Traces are organized in a hierarchical fashion, where each span except the root span has parents and can have any number of children. If a span has a child span active during its lifetime, this usually means that @@ -120,25 +119,40 @@ this parent span is not actually in control of the CPU. Thus, calculating the total CPU time is not as easy as adding up all of the span durations; only the time that a span does not have any active children (this is often referred to as the "self-time") should count. -However, Acero is a multi-threaded engine, so it is likely that there should -in fact be multiple spans performing work on a CPU at any given time! - -To model this, we have created 2 types of spans; logical spans and physical spans. -Logical spans do not perform work themselves, they are created to track the -lifetime of a certain resource (like a scheduler or a block of data). -Physical spans perform actual work. However, the self-time concept may apply, -so it is still not possible to simply aggregate all physical spans to get the -total CPU time! Aggregating all durations of top-level physical -spans (whose parent is a logical span) should give meaningful data though. (TODO: verify) - -Asynchronous behavior ---------------------- -Acero makes extensive use of asynchronous behavior. Many sections of code are +However, Acero is a multi-threaded engine, so it is likely that there are +in fact multiple spans performing work on a CPU at any given time! + +To achieve this multi-threaded behavior, many sections of code are executed through a task scheduling mechanism. When these tasks are scheduled, -they can start execution immediately or some time in the future. If a span is -active during that time, it will also track this task submission time. Tracking -this can be interesting, to see if e.g. the start latency is high, but care must -be taken to not aggregate this time into actual CPU processing time. +they can start execution immediately or some time in the future. +Often, a certain span is active that represents the lifetime of some resource +(like a scanner, but also a certain batch of data) that functions as the parent +of a set of spans where actual compute happens. +Care must be taken when aggregating the durations of these spans. + +Structure of Acero traces +------------------------- +Acero traces are structured to allow following pieces of data as they flow +through the graph. Each node's function (a kernel) is represented as a child +span of the preceding node. +Acero uses "Morsel-driven parallelism" where batches of data called "morsels" +flow through the graph. +The morsels are produced by e.g. a DatasetScanner. +First, the DatasetScanner reads files (called Fragments) into Batches. +Depending on the size of the fragments it will produce several Batches per +Fragment. +Then, it may slice the Batches so they do conform to the maximum size of a +morsel. +Each morsel has a toplevel span called ProcessMorsel. +Currently, the DatasetScanner cannot connect its output to the ProcessMorsel +spans due to the asynchronous structure of the code. +The dataset writer will gather batches of data in its staging area, and will +issue a write operation once it has enough rows. +This is represented by the DatasetWriter::Push and DatasetWriter::Pop spans. +These also carry the current fill level of the staging area. +This means that some Morsels will not trigger a write. +Only if a morsel causes the staging area to overflow its threshold, +a DatasetWriter::Pop is triggered that will perform a write operation. Backpressure @@ -149,15 +163,10 @@ it can ask its preceding nodes to slow down. This process is called - the buffer capacity for the node is almost full - the maximum number of concurrently open files is reached Relevant events such as a node applying/releasing backpressure, or an async task -group/scheduler throttling task submission, are posted as events to their -logical span (i.e. the long-running span representing the lifetime of that -scheduler/taskgroup) and can also be posted to the "local" span (that tracks -the submission of the block of data that caused the event). +group/scheduler throttling task submission, are posted as events to the toplevel +span that belongs to the asynchronous task scheduler, + and can also be posted to the "local" span (that belongs to the block of data + that caused the backpressure). -Performing analyses on traces ------------------------------ -The durations and additional attributes of each span allows various analyses -to be performed on them. This includes: -- Calculating the average throughput of a certain From 50e808c288006f3981c02af3a6f4c347b6e427b6 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 12 Oct 2023 16:05:11 +0200 Subject: [PATCH 45/46] Removed some of the more intrusive Otel spans Most of these were so intrusive to add the batch size as an attribute, when the task finishes. This attribute is now no longer available on the ReadNextAsync spans, but it still exists on the MakeChunkedBatchGenerator spans (which is where the re-chunking occurs, if necessary) --- cpp/src/arrow/csv/reader.cc | 63 ++----------------------- cpp/src/arrow/dataset/dataset_writer.cc | 3 -- cpp/src/arrow/dataset/file_csv.cc | 22 +-------- 3 files changed, 4 insertions(+), 84 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 973424770a9..106d9c19cfa 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -403,8 +403,6 @@ class BlockParsingOperator { num_rows_seen_(first_row) {} Result operator()(const CSVBlock& block) { - util::tracing::Span span; - START_SPAN(span, "arrow::csv::BlockParsingOperator"); constexpr int32_t max_num_rows = std::numeric_limits::max(); auto parser = std::make_shared( io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); @@ -435,7 +433,6 @@ class BlockParsingOperator { num_rows_seen_ += parser->total_num_rows(); } RETURN_NOT_OK(block.consume_bytes(parsed_size)); - ATTRIBUTE_ON_CURRENT_SPAN("parsed_size", parsed_size); return ParsedBlock{std::move(parser), block.block_index, static_cast(parsed_size) + block.bytes_skipped}; } @@ -453,8 +450,6 @@ class BlockParsingOperator { class BlockDecodingOperator { public: Future operator()(const ParsedBlock& block) { - util::tracing::Span span; - START_SPAN(span, "arrow::csv::BlockDecodingOperator"); DCHECK(!state_->column_decoders.empty()); std::vector>> decoded_array_futs; for (auto& decoder : state_->column_decoders) { @@ -463,23 +458,6 @@ class BlockDecodingOperator { auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped; auto decoded_arrays_fut = All(std::move(decoded_array_futs)); auto state = state_; -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span.details.get()); - return decoded_arrays_fut.Then( - [state, bytes_parsed_or_skipped, raw_span]( - const std::vector>>& maybe_decoded_arrays) - -> Result { - ARROW_ASSIGN_OR_RAISE(auto decoded_arrays, - arrow::internal::UnwrapOrRaise(maybe_decoded_arrays)); - - ARROW_ASSIGN_OR_RAISE(auto batch, - state->DecodedArraysToBatch(std::move(decoded_arrays))); - raw_span->SetAttribute("arrow.csv.output_batch_size_bytes", - util::TotalBufferSize(*batch)); - return DecodedBlock{std::move(batch), bytes_parsed_or_skipped}; - }); -#else return decoded_arrays_fut.Then( [state, bytes_parsed_or_skipped]( const std::vector>>& maybe_decoded_arrays) @@ -491,7 +469,6 @@ class BlockDecodingOperator { state->DecodedArraysToBatch(std::move(decoded_arrays))); return DecodedBlock{std::move(batch), bytes_parsed_or_skipped}; }); -#endif } static Result Make(io::IOContext io_context, @@ -908,25 +885,7 @@ class StreamingReaderImpl : public ReaderMixin, Future> ReadNextAsync() override { util::tracing::Span span; START_SPAN(span, "arrow::csv::ReadNextAsync"); - auto future = record_batch_gen_(); -#ifdef ARROW_WITH_OPENTELEMETRY - auto longer_living_span = std::make_unique(std::move(span)); - future.AddCallback( - [span = std::move(longer_living_span)]( - const arrow::Result>& result) { - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(span->details.get()); - if (result.ok()) { - auto result_batch = result.ValueOrDie(); - if (result_batch) { - raw_span->SetAttribute("batch.size_bytes", - ::arrow::util::TotalBufferSize(*result_batch)); - } - } - END_SPAN((*span)); - }); -#endif - return future; + return record_batch_gen_(); } protected: @@ -937,9 +896,6 @@ class StreamingReaderImpl : public ReaderMixin, return Status::Invalid("Empty CSV file"); } - util::tracing::Span init_span; - START_SPAN(init_span, "arrow::csv::InitAfterFirstBuffer"); - // Create a arrow::csv::ReadNextAsync span so that grouping by that name does not // ignore the work performed for this first block. util::tracing::Span read_span; @@ -964,23 +920,10 @@ class StreamingReaderImpl : public ReaderMixin, auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op)); auto self = shared_from_this(); - auto init_finished = rb_gen().Then([self, rb_gen, max_readahead -#ifdef ARROW_WITH_OPENTELEMETRY - , - init_span = std::move(init_span), + auto init_finished = rb_gen().Then([self, rb_gen, max_readahead, read_span = std::move(read_span) -#endif ](const DecodedBlock& first_block) { - auto fut = self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); -#ifdef ARROW_WITH_OPENTELEMETRY - opentelemetry::nostd::shared_ptr raw_span = - ::arrow::internal::tracing::UnwrapSpan(read_span.details.get()); - raw_span->SetAttribute("batch.size_bytes", - util::TotalBufferSize(*first_block.record_batch)); -#endif - END_SPAN(read_span); - END_SPAN(init_span); - return fut; + return self->InitFromBlock(first_block, std::move(rb_gen), max_readahead, 0); }); return init_finished; } diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index d961009311d..d660f6f948f 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -212,9 +212,6 @@ class DatasetWriterFileQueue { START_SPAN(span, "DatasetWriter::Push", {{"batch.size_rows", batch->num_rows()}, {"rows_currently_staged", rows_currently_staged_}, - // staged_rows_count is updated at the end, after this push and possibly - // multiple pops - // {"staged_rows_count", writer_state_->staged_rows_count}, {"options_.min_rows_per_group", options_.min_rows_per_group}, {"max_rows_staged", writer_state_->max_rows_staged}}); staged_batches_.push_back(std::move(batch)); diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 9afef6d871d..82de5c6c8e2 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -278,11 +278,6 @@ static inline Result GetReadOptions( static inline Future> OpenReaderAsync( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options, Executor* cpu_executor) { -#ifdef ARROW_WITH_OPENTELEMETRY - auto tracer = arrow::internal::tracing::GetTracer(); - // This span also captures task submission, possibly including wait time - auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync"); -#endif ARROW_ASSIGN_OR_RAISE( auto fragment_scan_options, GetFragmentScanOptions( @@ -301,8 +296,6 @@ static inline Future> OpenReaderAsync( // input->Peek call blocks so we run the whole thing on the I/O thread pool. auto reader_fut = DeferNotOk(input->io_context().executor()->Submit( [=]() -> Future> { - util::tracing::Span lambda_span; - START_SPAN(lambda_span, "arrow::csv::PeekAndMakeAsync", {{"threadpool", "IO"}}); ARROW_ASSIGN_OR_RAISE(auto first_block, input->Peek(reader_options.block_size)); const auto& parse_options = format.parse_options; @@ -310,30 +303,17 @@ static inline Future> OpenReaderAsync( auto convert_options, GetConvertOptions(format, scan_options ? scan_options.get() : nullptr, first_block)); - auto fut = csv::StreamingReader::MakeAsync( + return csv::StreamingReader::MakeAsync( io::default_io_context(), std::move(input), cpu_executor, reader_options, parse_options, convert_options); - return fut.Then([lambda_span = std::move(lambda_span)]( - const std::shared_ptr& reader) { - END_SPAN(lambda_span); - return reader; - }); })); return reader_fut.Then( // Adds the filename to the error [=](const std::shared_ptr& reader) -> Result> { -#ifdef ARROW_WITH_OPENTELEMETRY - span->SetStatus(opentelemetry::trace::StatusCode::kOk); - span->End(); -#endif return reader; }, [=](const Status& err) -> Result> { -#ifdef ARROW_WITH_OPENTELEMETRY - arrow::internal::tracing::MarkSpan(err, span.get()); - span->End(); -#endif return err.WithMessage("Could not open CSV input source '", path, "': ", err); }); } From fc9c6591403914b976ad5d29a8251ae94d2f2efd Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 12 Oct 2023 16:06:22 +0200 Subject: [PATCH 46/46] Removed the low-level File I/O spans, there's just too many of them and they have limited value. I'd still like to be able to have these spans, but e.g. only if the users explicitly asks for it. --- cpp/src/arrow/util/io_util.cc | 8 -------- 1 file changed, 8 deletions(-) diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 3429fba28e3..b42e8ae892d 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1629,8 +1629,6 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { HANDLE handle = reinterpret_cast(_get_osfhandle(fd)); #endif int64_t total_bytes_read = 0; - util::tracing::Span span; - START_SPAN(span, "FileRead", {{"fd", fd}}); while (total_bytes_read < nbytes) { const int64_t chunksize = @@ -1664,14 +1662,11 @@ Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { buffer += bytes_read; total_bytes_read += bytes_read; } - ATTRIBUTE_ON_CURRENT_SPAN("bytes_read", total_bytes_read); return total_bytes_read; } Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes) { int64_t bytes_read = 0; - util::tracing::Span span; - START_SPAN(span, "FileReadAt", {{"fd", fd}}); while (bytes_read < nbytes) { int64_t chunksize = @@ -1689,7 +1684,6 @@ Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nb position += ret; bytes_read += ret; } - ATTRIBUTE_ON_CURRENT_SPAN("bytes_read", bytes_read); return bytes_read; } @@ -1699,8 +1693,6 @@ Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nb Status FileWrite(int fd, const uint8_t* buffer, const int64_t nbytes) { int64_t bytes_written = 0; - util::tracing::Span span; - START_SPAN(span, "FileWrite", {{"nbytes", nbytes}, {"fd", fd}}); while (bytes_written < nbytes) { const int64_t chunksize =