Skip to content

Conversation

@lidavidm
Copy link
Member

This adds spans to the scanner, one per fragment and one per batch per fragment, that are enabled based on #ifdefs.

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@lidavidm
Copy link
Member Author

lidavidm commented Dec 15, 2021

TODO:

  • Confirm that we can link these spans to another parent span
    I originally added a parameter to ScanOptions but this won't work since it would expose OpenTelemetry symbols in the public API. Instead, so long as a span is active when you call ScanBatchesUnorderedAsync (directly or indirectly), that span will be the parent of any span spawned inside the scanner.
  • AddressSanitizer reports some leaks, either we need to add a suppression and report upstream or work out why we're getting failures.
    I've added suppressions in case they help others, but I can't quite work out why it happens in the first place. Manually shutting down the thread pool results in no leaks. With the debugger I can confirm the thread pool is shutting itself down at thread exit, but for some reason, in the second case, thread local destructors don't seem to get run. It resembles this LLVM thread (though there's no follow up there) - this might be something we should minimize and submit upstream.
    Weirdly, adding a span on the main thread (in order to test span linking) fixes this...

Example of output:

Details
$ env ARROW_TRACING_BACKEND=ostream ./debug/arrow-dataset-file-ipc-test --gtest_filter='TestScan/*.ScanRecordBatchReader/0AsyncThreaded16b1024r'
Running main() from /home/lidavidm/Code/upstream/arrow-15067/build/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest_main.cc
Note: Google Test filter = TestScan/*.ScanRecordBatchReader/0AsyncThreaded16b1024r
[==========] Running 1 test from 1 test suite.
[----------] Global test environment set-up.
[----------] 1 test from TestScan/TestIpcFileFormatScan
[ RUN      ] TestScan/TestIpcFileFormatScan.ScanRecordBatchReader/0AsyncThreaded16b1024r
[       OK ] TestScan/TestIpcFileFormatScan.ScanRecordBatchReader/0AsyncThreaded16b1024r (418 ms)
[----------] 1 test from TestScan/TestIpcFileFormatScan (418 ms total)

[----------] Global test environment tear-down
[==========] 1 test from 1 test suite ran. (418 ms total)
[  PASSED  ] 1 test.
{
  name          : arrow::dataset::IpcFileFormat::OpenReaderAsync
  trace_id      : 483084a5464d0ef6278da1cc982f9dd9
  span_id       : beb598d1dcb93984
  tracestate    : 
  parent_span_id: b8ec67d5f5fbc11d
  start         : 1639591037854068113
  duration      : 2869368
  description   : 
  span kind     : Internal
  status        : Ok
  attributes    : 
	thread_id: 139972187428928
  events        : 
  links         : 
  resources     : 
	service.name: unknown_service
	telemetry.sdk.version: 1.1.0
	telemetry.sdk.name: opentelemetry
	telemetry.sdk.language: cpp
  instr-lib     : arrow
}
{
  name          : arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next
  trace_id      : 483084a5464d0ef6278da1cc982f9dd9
  span_id       : 6462f59e9d0532df
  tracestate    : 
  parent_span_id: b8ec67d5f5fbc11d
  start         : 1639591037864180748
  duration      : 3811981
  description   : 
  span kind     : Internal
  status        : Ok
  attributes    : 
	thread_id: 139972187428928
  events        : 
  links         : 
  resources     : 
	service.name: unknown_service
	telemetry.sdk.version: 1.1.0
	telemetry.sdk.name: opentelemetry
	telemetry.sdk.language: cpp
  instr-lib     : arrow
}
(snip)
{
  name          : arrow::dataset::FragmentToBatches
  trace_id      : 483084a5464d0ef6278da1cc982f9dd9
  span_id       : b8ec67d5f5fbc11d
  tracestate    : 
  parent_span_id: 0000000000000000
  start         : 1639591037853521954
  duration      : 169164805
  description   : 
  span kind     : Internal
  status        : Ok
  attributes    : 
	thread_id: 139972070205184
	arrow.dataset.fragment.type_name: ipc
	arrow.dataset.fragment.last: 1
	arrow.dataset.fragment.index: 0
	arrow.dataset.fragment: <Buffer>
  events        : 
  links         : 
  resources     : 
	service.name: unknown_service
	telemetry.sdk.version: 1.1.0
	telemetry.sdk.name: opentelemetry
	telemetry.sdk.language: cpp
  instr-lib     : arrow
}

@lidavidm
Copy link
Member Author

CC @westonpace. You will need ARROW-15044/#11925 to actually see the spans (or a local OTLP collector instance).

I will circle back and try to isolate the issue with ASan later. That said, I think impact is minimal; it shouldn't affect CI and we can disable ASan or use the suppressions for local development, and it also wouldn't be enabled for things like Conbench.

I haven't quantified the performance impact here, either (do we have a benchmark that would stress these paths? I can run locally and report)

@lidavidm lidavidm marked this pull request as ready for review December 15, 2021 20:20
@westonpace westonpace self-requested a review December 15, 2021 22:31
@lidavidm
Copy link
Member Author

Rebased with ARROW-15044.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very useful. Just a few (potentially naive) thoughts.

Comment on lines +152 to +155
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we push this ifdef into StartSpan by returning a dummy span object with no-op methods?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I didn't want to wrap too much of the API, also, I figured this would be best if people were very concerned about overhead.

Comment on lines +136 to +144
template <typename T>
AsyncGenerator<T> PropagateSpanThroughAsyncGenerator(
AsyncGenerator<T> wrapped,
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span) {
return [=]() mutable -> Future<T> {
auto scope = GetTracer()->WithActiveSpan(span);
return wrapped();
};
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second glance this helper method seems a little off to me. Is the "active span" a thread local concept? Will this work even if wrapped() launches its task on a separate thread (e.g. an I/O operation)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, active span is thread local. wrapped() must manually propagate the active span (or manually pass the span through) if it itself spawns a thread. That is a disadvantage and it does make it hard to use OpenTelemetry while making it possible to completely remove it at compile time.

One way to get around this would be to instrument the Executor and possibly Future classes themselves, but I worry this would have more overhead than is desirable. (Or maybe not. I haven't tried.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concretely I'm thinking of...

#ifdef ARROW_WITH_OPENTELEMETRY
  batch_gen_gen = arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(
      std::move(batch_gen_gen));
#endif

If you are I/O bound then I would expect batch_gen_gen will be transferring to an I/O thread (and back) for every item. There are "async-local" concepts (e.g. https://docs.microsoft.com/en-us/dotnet/api/system.threading.asynclocal-1?view=net-6.0) so maybe we need to adopt something like that. I think that's the same thing as "instrumenting the executor and possibly future classes themselves". I think it would be fairly affordable (submitting a thread task would have to copy a handle to the active span or "async context" to include as part of the task and then the first thing in the task would be setting the active span based on that handle).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that's essentially the same (OpenTelemetry maintains a context which is thread-local by default, I think it can even be swapped out depending on how we want to go about things?). I'll try to take a look at this approach when I get a chance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be done in a follow-up? I think, at the moment, the consequence would be that spans don't have proper parentage but other than that it should be fairly harmless.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we can do that. If it pans out we can hopefully replace the manual instrumentation done here. I'll file a JIRA to explore this further.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this. I'm still not really in love with the proliferation of ifdef but I understand the motivation and I think we can get a better sense for what that feels like later.

auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically the GetReadOptions call could fail and it would bail out of the method without marking the span as finished. I'm not sure there is any easy and good solution though since we aren't using exceptions. Any ideas?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The span will be marked as finished in its destructor by default. Explicit marking is only required when you want to control the end time: https://github.com/open-telemetry/opentelemetry-cpp/blob/f20f72f3a904b215fc750b67b206f158aeb61241/sdk/src/trace/span.cc#L89-L92

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense!

@lidavidm
Copy link
Member Author

lidavidm commented Jan 7, 2022

#12100 would help with the ifdef proliferation, if we want to build on that.

@lidavidm
Copy link
Member Author

lidavidm commented Feb 3, 2022

Continued in #12328.

lidavidm added a commit that referenced this pull request Apr 7, 2022
Continuing #12328 and #11964.
The tracing spans were not propagated through all the asynchronous constructs, causing some spans to become disconnected from the trace. This PR aims to address this.
Some things left to do:
- [x] Possibly add some attributes to the `read_column` span
- [x] fix parent/sibling relationships (some of the new spans should probably become a child)
- [x] Do something about all the `#ifdefs`
- [x] Wrap around a `Future`
- [x] Wrap `Executor`
- [x] Check if tracing now works properly for all of the file types, not just parquet
- [x] lidavidm mentioned some memory leaks that should be investigated
- [x] The `FragmentToBatches` span seems to be active way too long

Closes #12609 from joosthooz/arrow-15067

Lead-authored-by: Joost Hoozemans <joosthooz@msn.com>
Co-authored-by: David Li <li.davidm96@gmail.com>
Co-authored-by: Matthijs Brobbel <m1brobbel@gmail.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants