Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cpp/build-support/lsan-suppressions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,10 @@
leak:*__new_exitfn*
# Leak at shutdown in OpenSSL
leak:CRYPTO_zalloc

# OpenTelemetry. These seem like false positives and go away if the
# CPU thread pool is manually shut down before exit.
# Note that ASan has trouble backtracing these and may not be able to
# without LSAN_OPTIONS=fast_unwind_on_malloc=0:malloc_context_size=100
leak:opentelemetry::v1::context::ThreadLocalContextStorage::GetStack
leak:opentelemetry::v1::context::ThreadLocalContextStorage::Stack::Resize
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "arrow/util/map.h"
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/variant.h"

namespace arrow {
Expand Down
32 changes: 26 additions & 6 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "arrow/util/async_generator.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/utf8.h"

namespace arrow {
Expand Down Expand Up @@ -167,9 +168,14 @@ static inline Result<csv::ReadOptions> GetReadOptions(
static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options, Executor* cpu_executor) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));

ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
auto path = source.path();
ARROW_ASSIGN_OR_RAISE(
input, io::BufferedInputStream::Create(reader_options.block_size,
default_memory_pool(), std::move(input)));
Expand All @@ -190,11 +196,20 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
}));
return reader_fut.Then(
// Adds the filename to the error
[](const std::shared_ptr<csv::StreamingReader>& reader)
-> Result<std::shared_ptr<csv::StreamingReader>> { return reader; },
[source](const Status& err) -> Result<std::shared_ptr<csv::StreamingReader>> {
return err.WithMessage("Could not open CSV input source '", source.path(),
"': ", err);
[=](const std::shared_ptr<csv::StreamingReader>& reader)
-> Result<std::shared_ptr<csv::StreamingReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
span->End();
#endif
return reader;
},
[=](const Status& err) -> Result<std::shared_ptr<csv::StreamingReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
arrow::internal::tracing::MarkSpan(err, span.get());
span->End();
#endif
return err.WithMessage("Could not open CSV input source '", path, "': ", err);
});
}

Expand Down Expand Up @@ -250,7 +265,12 @@ Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
auto source = file->source();
auto reader_fut =
OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool());
return GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
#ifdef ARROW_WITH_OPENTELEMETRY
generator = arrow::internal::tracing::WrapAsyncGenerator(
std::move(generator), "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next");
#endif
return generator;
}

Future<util::optional<int64_t>> CsvFileFormat::CountRows(
Expand Down
41 changes: 34 additions & 7 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {

Expand Down Expand Up @@ -62,16 +63,31 @@ static inline Result<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReader(
static inline Future<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReaderAsync(
const FileSource& source,
const ipc::IpcReadOptions& options = default_read_options()) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::IpcFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
auto path = source.path();
return ipc::RecordBatchFileReader::OpenAsync(std::move(input), options)
.Then([](const std::shared_ptr<ipc::RecordBatchFileReader>& reader)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> { return reader; },
[path](const Status& status)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> {
return status.WithMessage("Could not open IPC input source '", path,
"': ", status.message());
});
.Then(
[=](const std::shared_ptr<ipc::RecordBatchFileReader>& reader)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
span->End();
#endif
return reader;
},
[=](const Status& status)
-> Result<std::shared_ptr<ipc::RecordBatchFileReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
arrow::internal::tracing::MarkSpan(status, span.get());
span->End();
#endif
return status.WithMessage("Could not open IPC input source '", path,
"': ", status.message());
});
}

static inline Result<std::vector<int>> GetIncludedFields(
Expand Down Expand Up @@ -121,6 +137,10 @@ Result<std::shared_ptr<Schema>> IpcFileFormat::Inspect(const FileSource& source)
Result<RecordBatchGenerator> IpcFileFormat::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto parent_span = tracer->GetCurrentSpan();
#endif
auto self = shared_from_this();
auto source = file->source();
auto open_reader = OpenReaderAsync(source);
Expand Down Expand Up @@ -151,6 +171,13 @@ Result<RecordBatchGenerator> IpcFileFormat::ScanBatchesAsync(
ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator(
/*coalesce=*/false, options->io_context));
}
#ifdef ARROW_WITH_OPENTELEMETRY
opentelemetry::trace::StartSpanOptions span_options;
span_options.parent = parent_span->GetContext();
generator = arrow::internal::tracing::WrapAsyncGenerator(
std::move(generator), std::move(span_options),
"arrow::dataset::IpcFileFormat::ScanBatchesAsync::Next");
#endif
auto batch_generator = MakeReadaheadGenerator(std::move(generator), readahead_level);
return MakeChunkedBatchGenerator(std::move(batch_generator), options->batch_size);
};
Expand Down
25 changes: 21 additions & 4 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
#include "arrow/util/tracing_internal.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/writer.h"
Expand Down Expand Up @@ -366,6 +367,10 @@ Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader

Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
const FileSource& source, const std::shared_ptr<ScanOptions>& options) const {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::ParquetFileFormat::GetReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
Expand Down Expand Up @@ -398,10 +403,17 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
RETURN_NOT_OK(parquet::arrow::FileReader::Make(options->pool, std::move(reader),
std::move(arrow_properties),
&arrow_reader));
#ifdef ARROW_WITH_OPENTELEMETRY
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
span->End();
#endif
return std::move(arrow_reader);
},
[path](
const Status& status) -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
[=](const Status& status) -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
#ifdef ARROW_WITH_OPENTELEMETRY
arrow::internal::tracing::MarkSpan(status, span.get());
span->End();
#endif
return WrapSourceError(status, path);
});
}
Expand Down Expand Up @@ -448,8 +460,13 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync(
::arrow::internal::GetCpuThreadPool(), row_group_readahead));
return generator;
};
return MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
.Then(std::move(make_generator)));
auto generator = MakeFromFuture(GetReaderAsync(parquet_fragment->source(), options)
.Then(std::move(make_generator)));
#ifdef ARROW_WITH_OPENTELEMETRY
generator = arrow::internal::tracing::WrapAsyncGenerator(
std::move(generator), "arrow::dataset::ParquetFileFormat::ScanBatchesAsync::Next");
#endif
return generator;
}

Future<util::optional<int64_t>> ParquetFileFormat::CountRows(
Expand Down
32 changes: 28 additions & 4 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
#include "arrow/dataset/plan.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/config.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {

Expand Down Expand Up @@ -126,6 +128,18 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this<AsyncSc
Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
const Enumerated<std::shared_ptr<Fragment>>& fragment,
const std::shared_ptr<ScanOptions>& options) {
#ifdef ARROW_WITH_OPENTELEMETRY
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan(
"arrow::dataset::FragmentToBatches",
{
{"arrow.dataset.fragment", fragment.value->ToString()},
{"arrow.dataset.fragment.index", fragment.index},
{"arrow.dataset.fragment.last", fragment.last},
{"arrow.dataset.fragment.type_name", fragment.value->type_name()},
});
auto scope = tracer->WithActiveSpan(span);
#endif
ARROW_ASSIGN_OR_RAISE(auto batch_gen, fragment.value->ScanBatchesAsync(options));
ArrayVector columns;
for (const auto& field : options->dataset_schema->fields()) {
Expand All @@ -134,6 +148,10 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
MakeArrayOfNull(field->type(), /*length=*/0, options->pool));
columns.push_back(std::move(array));
}
#ifdef ARROW_WITH_OPENTELEMETRY
batch_gen =
arrow::internal::tracing::TieSpanToAsyncGenerator(std::move(batch_gen), span);
#endif
batch_gen = MakeDefaultIfEmptyGenerator(
std::move(batch_gen),
RecordBatch::Make(options->dataset_schema, /*num_rows=*/0, std::move(columns)));
Expand All @@ -150,10 +168,16 @@ Result<EnumeratedRecordBatchGenerator> FragmentToBatches(
Result<AsyncGenerator<EnumeratedRecordBatchGenerator>> FragmentsToBatches(
FragmentGenerator fragment_gen, const std::shared_ptr<ScanOptions>& options) {
auto enumerated_fragment_gen = MakeEnumeratedGenerator(std::move(fragment_gen));
return MakeMappedGenerator(std::move(enumerated_fragment_gen),
[=](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
return FragmentToBatches(fragment, options);
});
auto batch_gen_gen =
MakeMappedGenerator(std::move(enumerated_fragment_gen),
[=](const Enumerated<std::shared_ptr<Fragment>>& fragment) {
return FragmentToBatches(fragment, options);
});
#ifdef ARROW_WITH_OPENTELEMETRY
batch_gen_gen = arrow::internal::tracing::PropagateSpanThroughAsyncGenerator(
std::move(batch_gen_gen));
#endif
return batch_gen_gen;
}

const FieldVector kAugmentedFields{
Expand Down
55 changes: 54 additions & 1 deletion cpp/src/arrow/util/tracing_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ Iterator<T> WrapIterator(

template <typename T>
AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped,
opentelemetry::trace::StartSpanOptions options,
const std::string& span_name) {
return [=]() mutable -> Future<T> {
auto span = GetTracer()->StartSpan(span_name);
auto span = GetTracer()->StartSpan(span_name, {}, options);
auto scope = GetTracer()->WithActiveSpan(span);
auto fut = wrapped();
fut.AddCallback([span](const Result<T>& result) {
Expand All @@ -101,6 +102,58 @@ AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped,
};
}

/// \brief Start a new span for each invocation of a generator.
///
/// The parent span of the new span will be the currently active span
/// (if any) as of when WrapAsyncGenerator was itself called.
template <typename T>
AsyncGenerator<T> WrapAsyncGenerator(AsyncGenerator<T> wrapped,
const std::string& span_name) {
opentelemetry::trace::StartSpanOptions options;
options.parent = GetTracer()->GetCurrentSpan()->GetContext();
return WrapAsyncGenerator(std::move(wrapped), std::move(options), span_name);
}

/// \brief End the given span when the given async generator ends.
///
/// The span will be made the active span each time the generator is called.
template <typename T>
AsyncGenerator<T> TieSpanToAsyncGenerator(
AsyncGenerator<T> wrapped,
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span) {
return [=]() mutable -> Future<T> {
auto scope = GetTracer()->WithActiveSpan(span);
return wrapped().Then(
[span](const T& result) -> Result<T> {
span->SetStatus(opentelemetry::trace::StatusCode::kOk);
return result;
},
[span](const Status& status) -> Result<T> {
MarkSpan(status, span.get());
return status;
});
};
}

/// \brief Activate the given span on each invocation of an async generator.
template <typename T>
AsyncGenerator<T> PropagateSpanThroughAsyncGenerator(
AsyncGenerator<T> wrapped,
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span) {
return [=]() mutable -> Future<T> {
auto scope = GetTracer()->WithActiveSpan(span);
return wrapped();
};
}

/// \brief Activate the given span on each invocation of an async generator.
template <typename T>
AsyncGenerator<T> PropagateSpanThroughAsyncGenerator(AsyncGenerator<T> wrapped) {
auto span = GetTracer()->GetCurrentSpan();
if (!span->GetContext().IsValid()) return wrapped;
return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span));
}

class SpanImpl {
public:
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span;
Expand Down
18 changes: 18 additions & 0 deletions cpp/valgrind.supp
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,21 @@
...
fun:*re2*Prog*
}
{
<OpenTelemetry>:Thread locals don't appear to be freed
Memcheck:Leak
...
fun:*opentelemetry*ThreadLocalContextStorage*GetStack*
}
{
<OpenTelemetry>:Thread locals don't appear to be freed
Memcheck:Leak
...
fun:*opentelemetry*ThreadLocalContextStorage*Stack*Resize*
}
{
<OpenTelemetry>:Thread locals don't appear to be freed
Memcheck:Leak
...
fun:_dl_allocate_tls
}