Skip to content

Conversation

@joosthooz
Copy link
Contributor

@joosthooz joosthooz commented Feb 13, 2023

Rationale for this change

The structure of the opentelemetry traces have changed significantly in this recent PR #33738, this PR aims to improve some things that were left out of scope at that time. Specifically this PR aims to improve the tracing of the I/O side of things. We'll add spans for low-level I/O calls (e.g. write) and portions of the Parquet writer and Datasetwriter. The goal is to be able to distinguish work performed on the I/O threadpool versus the normal threadpool and see how much time was actually spent on I/O or on (de/en)coding and (de)compression.

What changes are included in this PR?

Are these changes tested?

This is a very early WIP

Are there any user-facing changes?

Arrow releases do not have telemetry enabled so these changes should not affect normal users. But users that are interested in the performance of Acero will likely want to use the telemetry. For these users, the structure and details of the spans will likely be changed by this PR. The current documentation of the telemetry does not include any details about this yet.

This is a rather high-level span for Parquet, there could be many underneath to show individual parts like encoding and compression.
…t real work.

Also, they were not linked in any way to the next() spans in the scanner even though they should be.
@github-actions
Copy link

@github-actions
Copy link

⚠️ GitHub issue #33880 has been automatically assigned in GitHub to PR creator.

These spans are logical, because the actual work does not necessarily start rightaway.
Those spans became duplicates in practice, instead of having an event added to them when they were submitted.
During that time the (possibly multiple!) duplicates would remain active until the task was finally finished.
Instead, we are now adding an event to the scheduler's span that says task submission was throttled.
This allows calculating average throughput of a task.
… favor of a ReadNextAsync span

The former was not capturing the actual work performed, looking at profiling output from e.g. VTune.
Note that the output size as reported by the arrow::csv::ReadNextAsync span corresponds to the output size of the arrow::csv::BlockDecodingOperator child of its *predecessor.
The first block of data is already processed during initialization (captured by the arrow::csv::InitAfterFirstBuffer span).
It is named after the ReadNextAsync span so we can do a groupby using that name and get all the actual work performed for reading CSV.
because the span does sometimes include async wait and is not equivalent to a ReadNextAsync.
@github-actions github-actions bot added the awaiting review Awaiting review label Mar 2, 2023
@joosthooz joosthooz force-pushed the GH-33880-improve-tracing branch from 57e61c1 to 4e0d497 Compare July 13, 2023 08:10
@joosthooz joosthooz marked this pull request as ready for review July 17, 2023 14:33
@joosthooz
Copy link
Contributor Author

@mapleFU I think this is ready for some feedback. This PR requires some additional context. I've focused mostly on the Dataset writers and readers, and I think that some of the "normal" readers/writers are still not yet instrumented with spans. I think we should leave that out of scope. As noted in the description above, this PR is a follow-up to a previous PR that changed the structure of traces. Before, there were top-level spans for each graph node, and each item of data would have a child span underneath each of those.
Here, the goal is to use spans' parent-child relation to show how morsels flow through the graph:
image
These morsels are produced by the readers:
image
But because of how the code is written, I have not been able to connect these together. In the ideal world, there would be a connection between ReadNextAsync -> MakeChunkedBatchGenerator -> ProcessMorsel. But that would require more invasive code changes (passing along a span context with an ExecBatch, for example).
image
What I'd still like to do is somehow add an attribute to certain spans whose duration can be aggregated to get some sensible total (like CPU time spent on the read side, each kernel, and the write side of things). And then I've been working on a notebook to visualize a trace into a per-thread activity overview, it would be nice if we can add something like that but I'm not sure how.
Looking forward to any feedback

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can tackle this PR more easily as multiple PRs but I will leave it up to you. We could have on PR for Acero related things and one for the I/O & readers.

The spans in the CSV reader seem overly obtrusive, I'd like to see if we can find a way to avoid paths where we have an #ifdef and then repeat logic in both the if and else blocks.

I've made a few comments about how tracing was working in Acero with the task scheduler. I don't think the changes here are necessarily improving the way that was handled. Maybe we can talk this through in more detail.

Comment on lines 466 to 481
#ifdef ARROW_WITH_OPENTELEMETRY
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> raw_span =
::arrow::internal::tracing::UnwrapSpan(span.details.get());
return decoded_arrays_fut.Then(
[state, bytes_parsed_or_skipped, raw_span](
const std::vector<Result<std::shared_ptr<Array>>>& maybe_decoded_arrays)
-> Result<DecodedBlock> {
ARROW_ASSIGN_OR_RAISE(auto decoded_arrays,
arrow::internal::UnwrapOrRaise(maybe_decoded_arrays));

ARROW_ASSIGN_OR_RAISE(auto batch,
state->DecodedArraysToBatch(std::move(decoded_arrays)));
raw_span->SetAttribute("arrow.csv.output_batch_size_bytes",
util::TotalBufferSize(*batch));
return DecodedBlock{std::move(batch), bytes_parsed_or_skipped};
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is too much duplicated code in these alternate paths. Is there any way we can just pass span to the callback's capture list and use ATTRIBUTE_ON_CURRENT_SPAN? Also, what happened to END_SPAN_ON_FUTURE_COMPLETION?

Comment on lines 968 to 972
#ifdef ARROW_WITH_OPENTELEMETRY
,
init_span = std::move(init_span),
read_span = std::move(read_span)
#endif
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this ifdef block needed? If tracing is disabled then init_span and read_span are an empty unique_ptr. Copying it around should be trivial.

Comment on lines 975 to 980
#ifdef ARROW_WITH_OPENTELEMETRY
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> raw_span =
::arrow::internal::tracing::UnwrapSpan(read_span.details.get());
raw_span->SetAttribute("batch.size_bytes",
util::TotalBufferSize(*first_block.record_batch));
#endif
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not ATTRIBUTE_ON_CURRENT_SPAN?

{"rows_currently_staged", rows_currently_staged_},
// staged_rows_count is updated at the end, after this push and possibly
// multiple pops
// {"staged_rows_count", writer_state_->staged_rows_count},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// {"staged_rows_count", writer_state_->staged_rows_count},

Comment on lines 215 to 216
// staged_rows_count is updated at the end, after this push and possibly
// multiple pops
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this comment

Comment on lines -309 to +319
TraceTaskQueued(task.get(), span());
EVENT(span(), "Task submission throttled",
{{"task.name", ::opentelemetry::nostd::string_view(task->name().data(),
task->name().size())},
{"task.cost", task->cost()}});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

span() is the scheduler's span. This puts an event on the scheduler's span that submission was throttled. I don't think that's terribly interesting.

What I was aiming for with TraceTaskQueued was to actually start the task's span. Then, later, in TraceTaskSubmitted, it would register a "task submitted" event.

So a throttled task (that is queued for 1 time unit and takes 2 time units to run) would look something like this...

T+0 Task Span Begins
T+1 Task Submitted
T+3 Task Ends

A non-throttled task would look something like:

T+0 Task Begins
T+2 Task Ends

opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> parent_span;
};
task = std::make_unique<WrapperTask>(
std::move(task), arrow::internal::tracing::GetTracer()->GetCurrentSpan());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I was intentionally not using GetCurrentSpan as the parent. We are submitting a new task here. I would like that new task to appear as its own span and not as a child of the thing that submitted it. For example, in Acero, in most plans, the source node is the one that schedules all the tasks. If we use GetCurrentSpan here then it will look like all the work being done is a part of the source node.

#endif
int64_t total_bytes_read = 0;
util::tracing::Span span;
START_SPAN(span, "FileRead", {{"fd", fd}});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very deep in the I/O & filesystem hierarchy. Maybe it would be better to put these spans in LocalFileSystem instead?

auto status = reader->NextBatch(records_to_read, out);

uint64_t size_bytes = ::arrow::util::TotalBufferSize(*out->get());
ATTRIBUTE_ON_CURRENT_SPAN("parquet.arrow.output_batch_size_bytes", size_bytes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to guard ATTRIBUTE_ON_CURRENT_SPAN with an ifdef? Couldn't we have left the ifdef where it was and moved ::arrow::util::TotalBufferSize inside the macro?

Comment on lines +135 to +155
Acero traces are structured to allow following pieces of data as they flow
through the graph. Each node's function (a kernel) is represented as a child
span of the preceding node.
Acero uses "Morsel-driven parallelism" where batches of data called "morsels"
flow through the graph.
The morsels are produced by e.g. a DatasetScanner.
First, the DatasetScanner reads files (called Fragments) into Batches.
Depending on the size of the fragments it will produce several Batches per
Fragment.
Then, it may slice the Batches so they do conform to the maximum size of a
morsel.
Each morsel has a toplevel span called ProcessMorsel.
Currently, the DatasetScanner cannot connect its output to the ProcessMorsel
spans due to the asynchronous structure of the code.
The dataset writer will gather batches of data in its staging area, and will
issue a write operation once it has enough rows.
This is represented by the DatasetWriter::Push and DatasetWriter::Pop spans.
These also carry the current fill level of the staging area.
This means that some Morsels will not trigger a write.
Only if a morsel causes the staging area to overflow its threshold,
a DatasetWriter::Pop is triggered that will perform a write operation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acero has a task scheduler. I've had the most luck when I structured spans in this way:

  • There is one span for the plan itself that starts when StartProducing is called and ends when the plan finishes.
  • Within that span there is a span for the scheduler. It should have more or less the same lifetime (these top two spans can be combined if desired)
  • As a child of that each thread task has its own span.
  • Various operations (filtering / projection / etc.) are children of their thread task and may be nested as needed (e.g. a filtering span may have child spans for the expression evaluation)

This is sort of what I had in async_util.cc but I think these changes have altered that.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting review Awaiting review labels Jul 21, 2023
…e-tracing

# Conflicts:
#	cpp/src/arrow/util/async_util.cc
Most of these were so intrusive to add the batch size as an attribute, when the task finishes.
This attribute is now no longer available on the ReadNextAsync spans, but it still exists on the MakeChunkedBatchGenerator spans (which is where the re-chunking occurs, if necessary)
…nd they have limited value.

I'd still like to be able to have these spans, but e.g. only if the users explicitly asks for it.
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Oct 12, 2023
@github-actions
Copy link

Thank you for your contribution. Unfortunately, this pull request has been marked as stale because it has had no activity in the past 365 days. Please remove the stale label or comment below, or this PR will be closed in 14 days. Feel free to re-open this if it has been closed in error. If you do not have repository permissions to reopen the PR, please tag a maintainer.

@github-actions github-actions bot added the Status: stale-warning Issues and PRs flagged as stale which are due to be closed if no indication otherwise label Nov 18, 2025
@github-actions github-actions bot closed this Dec 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

awaiting change review Awaiting change review Component: C++ Component: Documentation Component: Parquet Status: stale-warning Issues and PRs flagged as stale which are due to be closed if no indication otherwise

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add/improve tracing in the dataset writer

3 participants