Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c78f0cf
ARROW-15067: [C++] Add tracing spans to the scanner
lidavidm Dec 14, 2021
18dd557
ARROW-15067: [C++] Add ASAN suppressions for OTel
lidavidm Dec 15, 2021
12abf3b
ARROW-15067: [C++] Add Valgrind suppressions for OTel
lidavidm Dec 15, 2021
d962563
ARROW-15067: [C++] Ensure spans propagate through call to scanner
lidavidm Dec 15, 2021
7385c96
ARROW-15044: [C++] Use Then()
lidavidm Dec 20, 2021
26d41e6
Fix rebase. Scanner::Scan was removed in ARROW-13554.
mbrobbel Feb 3, 2022
c5805e0
Propagating spans to asynchronous parquet reader
joosthooz Mar 11, 2022
f74b77e
Removed a span layer from scanner, added some config ifdefs
joosthooz Mar 11, 2022
e4fa2c9
Code formatting
joosthooz Mar 11, 2022
6f97ba5
Added some attributes to parquet column reader span
joosthooz Mar 11, 2022
3e00a5f
TieSpanToAsyncGenerator now ending span when asyncgen finishes
joosthooz Mar 11, 2022
8a16e2d
Added some tracing macros to replace the prolific #ifdefs
joosthooz Mar 11, 2022
25e654d
Processed review comments
joosthooz Mar 18, 2022
837d56e
Changed lambda capture list to = to prevent errors when building with…
joosthooz Mar 18, 2022
e742035
Attempt at creating a struct wrapper for Future
joosthooz Mar 18, 2022
17cb609
Forward declaring GetTracer in Future
joosthooz Mar 18, 2022
dac4c0a
Modifications to wrapper struct, still not working
joosthooz Mar 18, 2022
05d4b41
Moved tracing span wrapper to ConcreteFutureImpl (still not working yet)
joosthooz Mar 21, 2022
f22cd44
Passing along tracing span to futures now seems to be working
joosthooz Mar 21, 2022
9482c58
Guarding trace code with #ifdef ARROW_WITH_OPENTELEMETRY
joosthooz Mar 21, 2022
287dfe7
Code formatting
joosthooz Mar 21, 2022
e3684b9
Removed unused line
joosthooz Mar 21, 2022
2a52f65
Changes otel macros to work similar to ARROW_ASSIGN...
joosthooz Mar 22, 2022
fef837e
Attempt at forward otel spans through Executor::Submit (not working yet)
joosthooz Mar 22, 2022
e4f0fde
Using std::function to wrap the struct (now templated with Func retur…
joosthooz Mar 28, 2022
4573548
Moved Executor task wrapping down into SpawnReal
joosthooz Mar 30, 2022
61ffaae
Formatting
joosthooz Mar 30, 2022
61c7af5
Addressed review comments
joosthooz Mar 30, 2022
d3cd60b
Reverted some unneeded changes
joosthooz Mar 30, 2022
b8f6c2f
Added another guarded macro
joosthooz Mar 30, 2022
78e528f
Formatting
joosthooz Mar 30, 2022
42798cc
Moving the generators when wrapping them
joosthooz Mar 30, 2022
9abf327
Formatting
joosthooz Mar 30, 2022
b1d4689
Manually setting the current span as parent is not needed
joosthooz Apr 1, 2022
f32fda8
Removed some helper functions that were too similar in nature
joosthooz Apr 1, 2022
71f6989
Merged similar tracing functions regarding generators, updated/added …
joosthooz Apr 1, 2022
4a82a7a
Addressed review comment
joosthooz Apr 1, 2022
90e2768
Formatting
joosthooz Apr 1, 2022
3e24f54
Updated macros for when opentelemetry is disabled
joosthooz Apr 1, 2022
7330f14
Removed some left-over code
joosthooz Apr 4, 2022
df2a6be
Small updates to otel helper function documentation
joosthooz Apr 4, 2022
567fa30
Adding a comment about the use of macros for helper functions
joosthooz Apr 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cpp/build-support/lsan-suppressions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,10 @@
leak:*__new_exitfn*
# Leak at shutdown in OpenSSL
leak:CRYPTO_zalloc

# OpenTelemetry. These seem like false positives and go away if the
# CPU thread pool is manually shut down before exit.
# Note that ASan has trouble backtracing these and may not be able to
# without LSAN_OPTIONS=fast_unwind_on_malloc=0:malloc_context_size=100
leak:opentelemetry::v1::context::ThreadLocalContextStorage::GetStack
leak:opentelemetry::v1::context::ThreadLocalContextStorage::Stack::Resize
Comment on lines +22 to +28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably test and ensure we don't get issues anymore once the new version of OTel comes out.

30 changes: 24 additions & 6 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "arrow/util/async_generator.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/utf8.h"

namespace arrow {
Expand Down Expand Up @@ -167,9 +168,14 @@ static inline Result<csv::ReadOptions> GetReadOptions(
static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));

ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
const auto& path = source.path();
ARROW_ASSIGN_OR_RAISE(
input, io::BufferedInputStream::Create(reader_options.block_size,
default_memory_pool(), std::move(input)));
Expand All @@ -190,11 +196,20 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
}));
return reader_fut.Then(
// Adds the filename to the error
[](const std::shared_ptr<csv::StreamingReader>& reader)
-> Result<std::shared_ptr<csv::StreamingReader>> { return reader; },
[source](const Status& err) -> Result<std::shared_ptr<csv::StreamingReader>> {
return err.WithMessage("Could not open CSV input source '", source.path(),
"': ", err);
[=](const std::shared_ptr<csv::StreamingReader>& reader)
-> Result<std::shared_ptr<csv::StreamingReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
span->End();
#endif
return reader;
},
[=](const Status& err) -> Result<std::shared_ptr<csv::StreamingReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
arrow::internal::tracing::MarkSpan(err, span.get());
span->End();
#endif
return err.WithMessage("Could not open CSV input source '", path, "': ", err);
});
}

Expand Down Expand Up @@ -250,7 +265,10 @@ Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
auto source = file->source();
auto reader_fut =
OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool());
return GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(
generator, "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next");
return generator;
}

Future<util::optional<int64_t>> CsvFileFormat::CountRows(
Expand Down
32 changes: 25 additions & 7 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {

Expand Down Expand Up @@ -62,16 +63,31 @@ static inline Result<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReader(
static inline Future<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReaderAsync(
const FileSource& source,
const ipc::IpcReadOptions& options = default_read_options()) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::IpcFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
auto path = source.path();
return ipc::RecordBatchFileReader::OpenAsync(std::move(input), options)
.Then([](const std::shared_ptr<ipc::RecordBatchFileReader>& reader)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> { return reader; },
[path](const Status& status)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> {
return status.WithMessage("Could not open IPC input source '", path,
"': ", status.message());
});
.Then(
[=](const std::shared_ptr<ipc::RecordBatchFileReader>& reader)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
span->End();
#endif
return reader;
},
[=](const Status& status)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
arrow::internal::tracing::MarkSpan(status, span.get());
span->End();
#endif
return status.WithMessage("Could not open IPC input source '", path,
"': ", status.message());
});
}

static inline Result<std::vector<int>> GetIncludedFields(
Expand Down Expand Up @@ -151,6 +167,8 @@ Result<RecordBatchGenerator> IpcFileFormat::ScanBatchesAsync(
ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(
/*coalesce=*/false, options->io_context));
}
WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(
generator, "arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next");
auto batch_generator = MakeReadaheadGenerator(std::move(generator), readahead_level);
return MakeChunkedBatchGenerator(std::move(batch_generator), options->batch_size);
};
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
#include "arrow/util/tracing_internal.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
Expand Down Expand Up @@ -415,8 +416,11 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
::arrow::internal::GetCpuThreadPool(), row_group_readahead));
return generator;
};
return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
.Then(std::move(make_generator)));
auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
.Then(std::move(make_generator)));
WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(
generator, "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next");
return generator;
}

Future<util::optional<int64_t>> ParquetFileFormat::CountRows(
Expand Down
26 changes: 22 additions & 4 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
#include "arrow/dataset/plan.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/config.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {

Expand Down Expand Up @@ -206,6 +208,18 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this<AsyncSc
Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
const Enumerated<std::shared_ptr<Fragment>>& fragment,
const std::shared_ptr<ScanOptions>& options) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan(
"arrow::dataset::FragmentToBatches",
{
{"arrow.dataset.fragment", fragment.value->ToString()},
{"arrow.dataset.fragment.index", fragment.index},
{"arrow.dataset.fragment.last", fragment.last},
{"arrow.dataset.fragment.type_name", fragment.value->type_name()},
});
auto scope = tracer->WithActiveSpan(span);
#endif
ARROW_ASSIGN_OR_RAISE(auto batch_gen, fragment.value->ScanBatchesAsync(options));
ArrayVector columns;
for (const auto& field : options->dataset_schema->fields()) {
Expand All @@ -214,6 +228,7 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
MakeArrayOfNull(field->type(), /*length=*/0, options->pool));
columns.push_back(std::move(array));
}
WRAP_ASYNC_GENERATOR(batch_gen);
batch_gen = MakeDefaultIfEmptyGenerator(
std::move(batch_gen),
RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns)));
Expand All @@ -230,10 +245,13 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options) {
auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
return MakeMappedGenerator(std::move(enumerated_fragment_gen),
[=](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
return FragmentToBatches(fragment, options);
});
auto batch_gen_gen =
MakeMappedGenerator(std::move(enumerated_fragment_gen),
[=](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
return FragmentToBatches(fragment, options);
});
PROPAGATE_SPAN_TO_GENERATOR(std::move(batch_gen_gen));
return batch_gen_gen;
}

class OneShotFragment : public Fragment {
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/util/future.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {

Expand Down Expand Up @@ -242,6 +243,19 @@ class ConcreteFutureImpl : public FutureImpl {
void AddCallback(Callback callback, CallbackOptions opts) {
CheckOptions(opts);
std::unique_lock<std::mutex> lock(mutex_);
#ifdef ARROW_WITH_OPENTELEMETRY
struct SpanWrapper {
void operator()(const FutureImpl& impl) {
auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span);
std::move(func)(impl);
}
Callback func;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> active_span;
};
SpanWrapper wrapper{std::move(callback),
::arrow::internal::tracing::GetTracer()->GetCurrentSpan()};
callback = std::move(wrapper);
#endif
CallbackRecord callback_record{std::move(callback), opts};
if (IsFutureFinished(state_)) {
lock.unlock();
Expand Down
29 changes: 29 additions & 0 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "arrow/util/logging.h"
#include "arrow/util/mutex.h"

#include "arrow/util/tracing_internal.h"

namespace arrow {
namespace internal {

Expand Down Expand Up @@ -58,6 +60,20 @@ SerialExecutor::~SerialExecutor() = default;

Status SerialExecutor::SpawnReal(TaskHints hints, FnOnce<void()> task,
StopToken stop_token, StopCallback&& stop_callback) {
#ifdef ARROW_WITH_OPENTELEMETRY
// Wrap the task to propagate a parent tracing span to it
struct SpanWrapper {
void operator()() {
auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(active_span);
std::move(func)();
}
FnOnce<void()> func;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> active_span;
};
SpanWrapper wrapper{std::move(task),
::arrow::internal::tracing::GetTracer()->GetCurrentSpan()};
task = std::move(wrapper);
#endif
// While the SerialExecutor runs tasks synchronously on its main thread,
// SpawnReal may be called from external threads (e.g. when transferring back
// from blocking I/O threads), so we need to keep the state alive *and* to
Expand Down Expand Up @@ -366,6 +382,19 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken sto
// We can still spin up more workers so spin up a new worker
LaunchWorkersUnlocked(/*threads=*/1);
}
#ifdef ARROW_WITH_OPENTELEMETRY
// Wrap the task to propagate a parent tracing span to it
struct {
void operator()() {
auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan);
std::move(func)();
}
FnOnce<void()> func;
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> activeSpan;
} wrapper{std::forward<FnOnce<void()>>(task),
::arrow::internal::tracing::GetTracer()->GetCurrentSpan()};
task = std::move(wrapper);
#endif
state_->pending_tasks_.push_back(
{std::move(task), std::move(stop_token), std::move(stop_callback)});
}
Expand Down
Loading