Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions processing/src/main/java/io/druid/query/QueryRunnerHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
Expand All @@ -33,7 +34,9 @@
import io.druid.segment.StorageAdapter;
import org.joda.time.Interval;

import java.io.Closeable;
import java.util.List;
import java.util.Map;

/**
*/
Expand Down Expand Up @@ -81,4 +84,15 @@ public Result<T> apply(Cursor input)
Predicates.<Result<T>>notNull()
);
}

public static <T> QueryRunner<T> makeClosingQueryRunner(final QueryRunner<T> runner, final Closeable closeable){
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
return new ResourceClosingSequence<>(runner.run(query, responseContext), closeable);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,41 @@
import java.util.Map;

/**
*/
*/
public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunnerFactory<T, Query<T>> factory;
private final ReferenceCountingSegment adapter;
private final SegmentDescriptor descriptor;

public ReferenceCountingSegmentQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
ReferenceCountingSegment adapter
ReferenceCountingSegment adapter,
SegmentDescriptor descriptor
)
{
this.factory = factory;
this.adapter = adapter;
this.descriptor = descriptor;
}

@Override
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
final Closeable closeable = adapter.increment();
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);
if (closeable != null) {
try {
final Sequence<T> baseSequence = factory.createRunner(adapter).run(query, responseContext);

return new ResourceClosingSequence<T>(baseSequence, closeable);
}
catch (RuntimeException e) {
CloseQuietly.close(closeable);
throw e;
return new ResourceClosingSequence<T>(baseSequence, closeable);
}
catch (RuntimeException e) {
CloseQuietly.close(closeable);
throw e;
}
} else {
// Segment was closed before we had a chance to increment the reference count
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(query, responseContext);
}
}
}
32 changes: 23 additions & 9 deletions server/src/main/java/io/druid/segment/realtime/FireHydrant.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
package io.druid.segment.realtime;

import com.google.common.base.Throwables;
import com.metamx.common.Pair;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;

import java.io.Closeable;
import java.io.IOException;

/**
Expand All @@ -34,6 +36,7 @@ public class FireHydrant
private final int count;
private volatile IncrementalIndex index;
private volatile ReferenceCountingSegment adapter;
private Object swapLock = new Object();

public FireHydrant(
IncrementalIndex index,
Expand Down Expand Up @@ -61,7 +64,7 @@ public IncrementalIndex getIndex()
return index;
}

public ReferenceCountingSegment getSegment()
public Segment getSegment()
{
return adapter;
}
Expand All @@ -78,16 +81,27 @@ public boolean hasSwapped()

public void swapSegment(Segment adapter)
{
if (this.adapter != null) {
try {
this.adapter.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
synchronized (swapLock) {
if (this.adapter != null) {
try {
this.adapter.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
this.adapter = new ReferenceCountingSegment(adapter);
this.index = null;
}
}

public Pair<Segment, Closeable> getAndIncrementSegment()
{
// Prevent swapping of index before increment is called
synchronized (swapLock) {
Closeable closeable = adapter.increment();
return new Pair<Segment, Closeable>(adapter, closeable);
}
this.adapter = new ReferenceCountingSegment(adapter);
this.index = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
Expand All @@ -55,6 +56,7 @@
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryRunnerHelper;
import io.druid.query.QueryToolChest;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
Expand Down Expand Up @@ -327,47 +329,38 @@ public QueryRunner<T> apply(TimelineObjectHolder<String, Sink> holder)
@Override
public QueryRunner<T> apply(FireHydrant input)
{
// It is possible that we got a query for a segment, and while that query
// is in the jetty queue, the segment is abandoned. Here, we need to retry
// the query for the segment.
if (input == null || input.getSegment() == null) {
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor);
}

if (skipIncrementalSegment && !input.hasSwapped()) {
return new NoopQueryRunner<T>();
}

// Prevent the underlying segment from closing when its being iterated
final ReferenceCountingSegment segment = input.getSegment();
final Closeable closeable = segment.increment();
// Prevent the underlying segment from swapping when its being iterated
final Pair<Segment, Closeable> segment = input.getAndIncrementSegment();
try {
QueryRunner<T> baseRunner = QueryRunnerHelper.makeClosingQueryRunner(
factory.createRunner(segment.lhs),
segment.rhs
);

if (input.hasSwapped() // only use caching if data is immutable
&& cache.isLocal() // hydrants may not be in sync between replicas, make sure cache is local
) {
return new CachingQueryRunner<>(
makeHydrantIdentifier(input, segment),
makeHydrantIdentifier(input, segment.lhs),
descriptor,
objectMapper,
cache,
toolchest,
factory.createRunner(segment),
baseRunner,
MoreExecutors.sameThreadExecutor(),
cacheConfig
);
} else {
return factory.createRunner(input.getSegment());
return baseRunner;
}
}
finally {
try {
if (closeable != null) {
closeable.close();
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
catch (RuntimeException e) {
CloseQuietly.close(segment.rhs);
throw e;
}
}
}
Expand All @@ -385,7 +378,7 @@ public QueryRunner<T> apply(FireHydrant input)
);
}

protected static String makeHydrantIdentifier(FireHydrant input, ReferenceCountingSegment segment)
protected static String makeHydrantIdentifier(FireHydrant input, Segment segment)
{
return segment.getIdentifier() + "_" + input.getCount();
}
Expand All @@ -406,12 +399,12 @@ public void persist(final Committer committer)
final Stopwatch persistStopwatch = Stopwatch.createStarted();

final Map<String, Object> metadataElems = committer.getMetadata() == null ? null :
ImmutableMap.of(
COMMIT_METADATA_KEY,
committer.getMetadata(),
COMMIT_METADATA_TIMESTAMP_KEY,
System.currentTimeMillis()
);
ImmutableMap.of(
COMMIT_METADATA_KEY,
committer.getMetadata(),
COMMIT_METADATA_TIMESTAMP_KEY,
System.currentTimeMillis()
);

persistExecutor.execute(
new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
return toolChest.makeMetricBuilder(input);
}
},
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter),
new ReferenceCountingSegmentQueryRunner<T>(factory, adapter, segmentDescriptor),
"query/segment/time",
ImmutableMap.of("segment", adapter.getIdentifier())
),
Expand Down