Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnConfig;
Expand Down Expand Up @@ -150,7 +151,7 @@ public void setup() throws IOException
0
);
hashJoinLookupStringKeySegment = new HashJoinSegment(
baseSegment,
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesLookupStringKey,
preAnalysisLookupStringKey
);
Expand All @@ -177,7 +178,7 @@ public void setup() throws IOException
0
);
hashJoinLookupLongKeySegment = new HashJoinSegment(
baseSegment,
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesLookupLongKey,
preAnalysisLookupLongKey
);
Expand All @@ -204,7 +205,7 @@ public void setup() throws IOException
0
);
hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
baseSegment,
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesIndexedTableStringKey,
preAnalysisIndexedTableStringKey
);
Expand All @@ -231,7 +232,7 @@ public void setup() throws IOException
0
);
hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
baseSegment,
ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesIndexedTableLonggKey,
preAnalysisIndexedTableLongKey
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITTestCoordinatorPausedTest extends AbstractITBatchIndexTest
{
private static final Logger LOG = new Logger(ITUnionQueryTest.class);
private static final Logger LOG = new Logger(ITTestCoordinatorPausedTest.class);
private static final String INDEX_DATASOURCE = "wikipedia_index_test";
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.druid.tests.indexer;
package org.apache.druid.tests.query;

import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
Expand All @@ -39,6 +39,8 @@
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.ServerDiscoveryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
Expand All @@ -62,7 +64,7 @@ public class ITUnionQueryTest extends AbstractIndexerTest
private static final String UNION_TASK_RESOURCE = "/indexer/wikipedia_union_index_task.json";
private static final String EVENT_RECEIVER_SERVICE_PREFIX = "eventReceiverServiceName";
private static final String UNION_DATA_FILE = "/data/union_query/wikipedia_index_data.json";
private static final String UNION_QUERIES_RESOURCE = "/indexer/union_queries.json";
private static final String UNION_QUERIES_RESOURCE = "/queries/union_queries.json";
private static final String UNION_DATASOURCE = "wikipedia_index_test";

@Inject
Expand Down Expand Up @@ -92,7 +94,7 @@ public void testUnionQuery() throws IOException
closer.register(unloader(fullDatasourceName + i));
}
try {
// Load 4 datasources with same dimensions
// Load 3 datasources with same dimensions
String task = setShutOffTime(
getResourceAsString(UNION_TASK_RESOURCE),
DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3))
Expand All @@ -117,6 +119,7 @@ public void testUnionQuery() throws IOException
() -> {
for (int i = 0; i < numTasks; i++) {
final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01");
// there are 10 rows, but query only covers the first 5
if (countRows < 5) {
LOG.warn("%d events have been ingested to %s so far", countRows, fullDatasourceName + i);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery<?> query)
{
return DataSourceAnalysis.forDataSource(query.getDataSource())
.getBaseQuerySegmentSpec()
.orElse(query.getQuerySegmentSpec());
.orElseGet(query::getQuerySegmentSpec);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,50 +22,42 @@
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.ReferenceCounter;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;

public class ReferenceCountingSegmentQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunnerFactory<T, Query<T>> factory;
private final Segment segment;
private final ReferenceCounter segmentReferenceCounter;
private final SegmentReference segment;
private final SegmentDescriptor descriptor;

public ReferenceCountingSegmentQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
Segment segment,
ReferenceCounter segmentReferenceCounter,
SegmentReference segment,
SegmentDescriptor descriptor
)
{
this.factory = factory;
this.segment = segment;
this.segmentReferenceCounter = segmentReferenceCounter;
this.descriptor = descriptor;
}

@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, ResponseContext responseContext)
{
if (segmentReferenceCounter.increment()) {
return segment.acquireReferences().map(closeable -> {
try {
final Sequence<T> baseSequence = factory.createRunner(segment).run(queryPlus, responseContext);

return Sequences.withBaggage(baseSequence, segmentReferenceCounter.decrementOnceCloseable());
return Sequences.withBaggage(baseSequence, closeable);
}
catch (Throwable t) {
try {
segmentReferenceCounter.decrement();
closeable.close();
}
catch (Exception e) {
t.addSuppressed(e);
}
throw t;
}
} else {
// Segment was closed before we had a chance to increment the reference count
return new ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(queryPlus, responseContext);
}
}).orElseGet(() -> new ReportTimelineMissingSegmentQueryRunner<T>(descriptor).run(queryPlus, responseContext));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,11 @@

package org.apache.druid.segment;

import javax.annotation.Nullable;

/**
* @deprecated use {@link Segment} directly as this does nothing
*/
@Deprecated
public abstract class AbstractSegment implements Segment
{
@Override
@Nullable
public <T> T as(Class<T> clazz)
{
if (clazz.equals(QueryableIndex.class)) {
return (T) asQueryableIndex();
} else if (clazz.equals(StorageAdapter.class)) {
return (T) asStorageAdapter();
}
return null;
}
// i used to have a purpose
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
*/
public class IncrementalIndexSegment extends AbstractSegment
public class IncrementalIndexSegment implements Segment
{
private final IncrementalIndex index;
private final SegmentId segmentId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
*/
public class QueryableIndexSegment extends AbstractSegment
public class QueryableIndexSegment implements Segment
{
private final QueryableIndex index;
private final QueryableIndexStorageAdapter storageAdapter;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment;

import java.io.Closeable;
import java.util.Optional;

/**
* Interface for an object that may have a reference acquired in the form of a {@link Closeable}. This is intended to be
* used with an implementation of {@link ReferenceCountingCloseableObject}, or anything else that wishes to provide
* a method to account for the acquire and release of a reference to the object.
*/
public interface ReferenceCountedObject
{
/**
* This method is expected to increment a reference count and provide a {@link Closeable} that decrements the
* reference count when closed. This is likely just a wrapper around
* {@link ReferenceCountingCloseableObject#incrementReferenceAndDecrementOnceCloseable()}, but may also include any
* other associated references which should be incremented when this method is called, and decremented/released by the
* closeable.
*
* IMPORTANT NOTE: to fulfill the contract of this method, implementors must return a closeable to indicate that the
* reference can be acquired, even if there is nothing to close. Implementors should avoid allowing this method or the
* {@link Closeable} it creates to throw exceptions.
*
* For callers: if this method returns non-empty, IT MUST BE CLOSED, else reference counts can potentially leak.
*/
Optional<Closeable> acquireReferences();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment;

import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;

import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* ReferenceCountingCloseableObject implements something like automatic reference count-based resource management,
* backed by a {@link Phaser}.
*
* ReferenceCountingCloseableObject allows consumers to call {@link #close()} before some other "users", which called
* {@link #increment()} or {@link #incrementReferenceAndDecrementOnceCloseable()}, but have not called
* {@link #decrement()} yet or the closer for {@link #incrementReferenceAndDecrementOnceCloseable()}, and the wrapped
* object won't be actually closed until that all references are released.
*/
public abstract class ReferenceCountingCloseableObject<BaseObject extends Closeable> implements Closeable
{
private static final Logger log = new Logger(ReferenceCountingCloseableObject.class);

private final AtomicBoolean closed = new AtomicBoolean(false);
private final Phaser referents = new Phaser(1)
{
@Override
protected boolean onAdvance(int phase, int registeredParties)
{
// Ensure that onAdvance() doesn't throw exception, otherwise termination won't happen
if (registeredParties != 0) {
log.error("registeredParties[%s] is not 0", registeredParties);
}
try {
baseObject.close();
}
catch (Exception e) {
try {
log.error(e, "Exception while closing reference counted object[%s]", baseObject);
}
catch (Exception e2) {
// ignore
}
}
// Always terminate.
return true;
}
};

protected final BaseObject baseObject;

public ReferenceCountingCloseableObject(BaseObject object)
{
this.baseObject = object;
}

public int getNumReferences()
{
return Math.max(referents.getRegisteredParties() - 1, 0);
}

public boolean isClosed()
{
return referents.isTerminated();
}

/**
* Increment the reference count by one.
*/
public boolean increment()
{
// Negative return from referents.register() means the Phaser is terminated.
return referents.register() >= 0;
}

/**
* Decrement the reference count by one.
*/
public void decrement()
{
referents.arriveAndDeregister();
}

/**
* Returns an {@link Optional} of a {@link Closeable} from {@link #decrementOnceCloseable}, if it is able to
* successfully {@link #increment}, else nothing indicating that the reference could not be acquired.
*/
public Optional<Closeable> incrementReferenceAndDecrementOnceCloseable()
{
final Closer closer;
if (increment()) {
closer = Closer.create();
closer.register(decrementOnceCloseable());
} else {
closer = null;
}
return Optional.ofNullable(closer);
}

/**
* Returns a {@link Closeable} which action is to call {@link #decrement()} only once. If close() is called on the
* returned Closeable object for the second time, it won't call {@link #decrement()} again.
*/
public Closeable decrementOnceCloseable()
{
AtomicBoolean decremented = new AtomicBoolean(false);
return () -> {
if (decremented.compareAndSet(false, true)) {
decrement();
} else {
log.warn("close() is called more than once on ReferenceCountingCloseableObject.decrementOnceCloseable()");
}
};
}

@Override
public void close()
{
if (closed.compareAndSet(false, true)) {
referents.arriveAndDeregister();
} else {
log.warn("close() is called more than once on ReferenceCountingCloseableObject");
}
}
}
Loading