Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.Yielders;
import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
Expand Down Expand Up @@ -87,12 +88,7 @@ public OutType call() throws Exception
return baseSequence.accumulate(initValue, accumulator);
}
catch (SegmentMissingException e) {
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
appendMissingSegment(responseContext);
return initValue;
}
}
Expand All @@ -112,7 +108,13 @@ public <OutType> Yielder<OutType> toYielder(
@Override
public Yielder<OutType> call() throws Exception
{
return makeYielder(baseSequence.toYielder(initValue, accumulator));
try {
return makeYielder(baseSequence.toYielder(initValue, accumulator));
}
catch (SegmentMissingException e) {
appendMissingSegment(responseContext);
return Yielders.done(initValue, null);
}
}
}
);
Expand Down Expand Up @@ -164,6 +166,16 @@ private <RetType> RetType doItNamed(Callable<RetType> toRun)
};
}

private void appendMissingSegment(Map<String, Object> responseContext)
{
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
}

private <RetType> RetType doNamed(Thread currThread, String currName, String newName, Callable<RetType> toRun)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Yielder<Object> toYielder(
Object initValue, YieldingAccumulator accumulator
)
{
return null;
throw new SegmentMissingException("FAILSAUCE");
}
};

Expand All @@ -94,7 +94,8 @@ public Yielder<Object> toYielder(
)
);

final Map<String, Object> responseContext = Maps.newHashMap();
// from accumulate
Map<String, Object> responseContext = Maps.newHashMap();
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.granularity(QueryGranularities.ALL)
Expand All @@ -105,24 +106,26 @@ public Yielder<Object> toYielder(
)
)
.build();
Sequence results = queryRunner.run(
query,
responseContext
);
Sequence results = queryRunner.run(query, responseContext);
Sequences.toList(results, Lists.newArrayList());
validate(mapper, descriptor, responseContext);

Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY);

Assert.assertTrue(missingSegments != null);
Assert.assertTrue(missingSegments instanceof List);

Object segmentDesc = ((List) missingSegments).get(0);

Assert.assertTrue(segmentDesc instanceof SegmentDescriptor);

SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class);

Assert.assertEquals(descriptor, newDesc);
// from toYielder
responseContext = Maps.newHashMap();
results = queryRunner.run(query, responseContext);
results.toYielder(
null, new YieldingAccumulator()
{
final List lists = Lists.newArrayList();
@Override
public Object accumulate(Object accumulated, Object in)
{
lists.add(in);
return in;
}
}
);
validate(mapper, descriptor, responseContext);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -195,6 +198,12 @@ public void run()

Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows"));

validate(mapper, descriptor, responseContext);
}

private void validate(ObjectMapper mapper, SegmentDescriptor descriptor, Map<String, Object> responseContext)
throws java.io.IOException
{
Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY);

Assert.assertTrue(missingSegments != null);
Expand Down