Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 2 additions & 33 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,23 +262,6 @@ struct AnalysisDataProcessorBuilder {
}
}

template <typename Z>
auto changeShifts()
{
constexpr auto index = framework::has_type_at_v<Z>(associated_pack_t{});
if (unassignedGroups[index] > 0) {
uint64_t pos;
if constexpr (soa::is_soa_filtered_t<std::decay_t<G>>::value) {
pos = (*groupSelection)[position];
} else {
pos = position;
}
if ((idValues[index])[pos] < 0) {
++shifts[index];
}
}
}

GroupSlicerIterator(G& gt, std::tuple<A...>& at)
: mAt{&at},
mGroupingElement{gt.begin()},
Expand All @@ -299,14 +282,12 @@ struct AnalysisDataProcessorBuilder {
x.asArrowTable(),
static_cast<int32_t>(gt.tableSize()),
&groups[index],
&idValues[index],
&offsets[index]);
if (result.ok() == false) {
throw runtime_error("Cannot split collection");
}
unassignedGroups[index] = std::count_if(idValues[index].begin(), idValues[index].end(), [](auto&& x) { return x < 0; });
if ((groups[index].size() - unassignedGroups[index]) > gt.tableSize()) {
throw runtime_error_f("Splitting collection resulted in a larger group number (%d, %d of them unassigned) than there is rows in the grouping table (%d).", groups[index].size(), unassignedGroups[index], gt.tableSize());
if (groups[index].size() > gt.tableSize()) {
throw runtime_error_f("Splitting collection resulted in a larger group number (%d) than there is rows in the grouping table (%d).", groups[index].size(), gt.tableSize());
};
}
};
Expand All @@ -331,8 +312,6 @@ struct AnalysisDataProcessorBuilder {
(extractor(x), ...);
},
at);

(changeShifts<A>(), ...);
}

template <typename B, typename... C>
Expand Down Expand Up @@ -410,12 +389,6 @@ struct AnalysisDataProcessorBuilder {
} else {
pos = position;
}
if (unassignedGroups[index] > 0) {
if ((idValues[index])[pos + shifts[index]] < 0) {
++shifts[index];
}
pos += shifts[index];
}
if constexpr (soa::is_soa_filtered_t<std::decay_t<A1>>::value) {
auto groupedElementsTable = arrow::util::get<std::shared_ptr<arrow::Table>>(((groups[index])[pos]).value);

Expand Down Expand Up @@ -446,14 +419,10 @@ struct AnalysisDataProcessorBuilder {
typename grouping_t::iterator mGroupingElement;
uint64_t position = 0;
soa::SelectionVector const* groupSelection = nullptr;

std::array<std::vector<arrow::Datum>, sizeof...(A)> groups;
std::array<std::vector<int32_t>, sizeof...(A)> idValues;
std::array<std::vector<uint64_t>, sizeof...(A)> offsets;
std::array<soa::SelectionVector const*, sizeof...(A)> selections;
std::array<soa::SelectionVector::const_iterator, sizeof...(A)> starts;
std::array<int, sizeof...(A)> unassignedGroups{0};
std::array<int, sizeof...(A)> shifts{0};
};

GroupSlicerIterator& begin()
Expand Down
77 changes: 42 additions & 35 deletions Framework/Core/include/Framework/Kernels.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ namespace o2::framework
/// @a offset the offset in the original table at which the corresponding
/// slice was split.
template <typename T>
auto sliceByColumn(char const* key,
std::shared_ptr<arrow::Table> const& input,
T fullSize,
std::vector<arrow::Datum>* slices,
std::vector<int32_t>* vals = nullptr,
std::vector<uint64_t>* offsets = nullptr)
auto sliceByColumn(
char const* key,
std::shared_ptr<arrow::Table> const& input,
T fullSize,
std::vector<arrow::Datum>* slices,
std::vector<uint64_t>* offsets = nullptr,
std::vector<arrow::Datum>* unassignedSlices = nullptr,
std::vector<uint64_t>* unassignedOffsets = nullptr)
{
arrow::Datum value_counts;
auto options = arrow::compute::CountOptions::Defaults();
Expand All @@ -47,53 +49,58 @@ auto sliceByColumn(char const* key,
auto counts = static_cast<arrow::NumericArray<arrow::Int64Type>>(pair.field(1)->data());

// create slices and offsets
auto offset = 0;
uint64_t offset = 0;
uint64_t unassignedOffset = 0;
auto count = 0;

auto size = values.length();
if (vals != nullptr) {
for (auto i = 0; i < size; ++i) {
vals->push_back(values.Value(i));
}
}

auto makeSlice = [&](T count) {
slices->emplace_back(arrow::Datum{input->Slice(offset, count)});
auto makeSlice = [&](uint64_t offset_, T count_) {
slices->emplace_back(arrow::Datum{input->Slice(offset_, count_)});
if (offsets) {
offsets->emplace_back(offset);
offsets->emplace_back(offset_);
}
};

auto current = 0;
auto v = values.Value(0);
while (v - current >= 1) {
makeSlice(0);
++current;
}
auto makeUnassignedSlice = [&](uint64_t offset_, T count_) {
if (unassignedSlices) {
unassignedSlices->emplace_back(arrow::Datum{input->Slice(offset_, count_)});
}
if (unassignedOffsets) {
unassignedOffsets->emplace_back(offset_);
}
};

for (auto r = 0; r < size - 1; ++r) {
count = counts.Value(r);
makeSlice(count);
offset += count;
auto nextValue = values.Value(r + 1);
auto value = values.Value(r);
while (nextValue - value > 1) {
makeSlice(0);
++value;
auto v = 0;
auto vprev = v;
auto nzeros = 0;

for (auto i = 0; i < size; ++i) {
count = counts.Value(i);
if (v >= 0) {
vprev = v;
}
v = values.Value(i);
if (v < 0) {
makeUnassignedSlice(offset, count);
offset += count;
continue;
}
nzeros = v - vprev - ((i == 0) ? 0 : 1);
for (auto z = 0; z < nzeros; ++z) {
makeSlice(offset, 0);
}
makeSlice(offset, count);
offset += count;
}
makeSlice(counts.Value(size - 1));
offset += counts.Value(size - 1);

if (values.Value(size - 1) < fullSize - 1) {
for (auto v = values.Value(size - 1) + 1; v < fullSize; ++v) {
makeSlice(0);
makeSlice(offset, 0);
}
}

return arrow::Status::OK();
}

} // namespace o2::framework

#endif // O2_FRAMEWORK_KERNELS_H_
2 changes: 1 addition & 1 deletion Framework/Core/test/test_GroupSlicer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ BOOST_AUTO_TEST_CASE(ArrowDirectSlicing)

std::vector<arrow::Datum> slices;
std::vector<uint64_t> offsts;
auto status = sliceByColumn("fID", b_e.asArrowTable(), 20, &slices, nullptr, &offsts);
auto status = sliceByColumn("fID", b_e.asArrowTable(), 20, &slices, &offsts);
for (auto i = 0u; i < 5; ++i) {
auto tbl = arrow::util::get<std::shared_ptr<arrow::Table>>(slices[i].value);
auto ca = tbl->GetColumnByName("fArr");
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/test/test_Kernels.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ BOOST_AUTO_TEST_CASE(TestSlicingFramework)

std::vector<uint64_t> offsets;
std::vector<arrow::Datum> slices;
auto status = sliceByColumn<int32_t>("x", table, 12, &slices, nullptr, &offsets);
auto status = sliceByColumn<int32_t>("x", table, 12, &slices, &offsets);
BOOST_REQUIRE(status.ok());
BOOST_REQUIRE_EQUAL(slices.size(), 12);
std::array<int, 12> sizes{0, 4, 1, 0, 1, 2, 0, 0, 0, 0, 0, 0};
Expand Down