diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
index 0ec525ead4a5..ce1582182d66 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java
@@ -108,7 +108,7 @@ public Object get(final ByteBuffer buf, final int position)
final Lock lock = stripedLock.getAt(lockIndex(position)).readLock();
lock.lock();
try {
- return sketchCache.get(buf).get(position);
+ return sketchCache.get(buf).get(position).copy();
}
finally {
lock.unlock();
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
index ead9a6aa2807..609a46e01c07 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
@@ -69,7 +69,7 @@ public synchronized void aggregate(final ByteBuffer buffer, final int position)
@Override
public synchronized Object get(final ByteBuffer buffer, final int position)
{
- return sketches.get(buffer).get(position);
+ return sketches.get(buffer).get(position).compact();
}
@Override
diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java
index 74def15c0626..ff866f9ffd65 100644
--- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java
+++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java
@@ -66,7 +66,11 @@ public Object get(ByteBuffer buf, int position)
// | k (byte) | numLongs (int) | bitset (long[numLongs]) |
int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES);
mutationBuffer.limit(position + sizeBytes);
- return mutationBuffer.slice();
+
+ ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes);
+ resultCopy.put(mutationBuffer.slice());
+ resultCopy.rewind();
+ return resultCopy;
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
index ecd0c11b526a..ed77c912c736 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
@@ -72,7 +72,16 @@ public interface BufferAggregator extends HotLoopCallee
*
* Converts the given byte buffer representation into an intermediate aggregate Object
*
- * Implementations must not change the position, limit or mark of the given buffer
+ * Implementations must not change the position, limit or mark of the given buffer.
+ *
+ *
+ * The object returned must not have any references to the given buffer (i.e., make a copy), since the
+ * underlying buffer is a shared resource and may be given to another processing thread
+ * while the objects returned by this aggregator are still in use.
+ *
+ *
+ * If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator
+ * expects its inputs to be mutable, then the object returned by this method must be mutable.
*
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the aggregate value is stored