Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.segment.cache.DatasourceSegmentCache;
import org.apache.druid.metadata.segment.cache.Metric;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.query.DruidMetrics;
Expand Down Expand Up @@ -93,13 +92,12 @@ public <T> T inReadOnlyDatasourceTransaction(

// For read-only transactions, use cache only if it is already synced
if (segmentMetadataCache.isSyncedForRead()) {
final DatasourceSegmentCache datasourceCache
= segmentMetadataCache.getDatasource(dataSource);
final SegmentMetadataReadTransaction cachedTransaction
= new CachedSegmentMetadataTransaction(sqlTransaction, datasourceCache, leaderSelector, true);

emitTransactionCount(Metric.READ_ONLY_TRANSACTIONS, dataSource);
return datasourceCache.read(() -> executeReadAndClose(cachedTransaction, callback));
return segmentMetadataCache.readCacheForDataSource(dataSource, dataSourceCache -> {
final SegmentMetadataReadTransaction cachedTransaction
= new CachedSegmentMetadataTransaction(sqlTransaction, dataSourceCache, leaderSelector, true);
return executeReadAndClose(cachedTransaction, callback);
});
} else {
return executeReadAndClose(sqlTransaction, callback);
}
Expand All @@ -121,17 +119,9 @@ public <T> T inReadWriteDatasourceTransaction(
= createSqlTransaction(dataSource, handle, status);

if (segmentMetadataCache.isEnabled()) {
final boolean isCacheReadyForRead = segmentMetadataCache.isSyncedForRead();
final DatasourceSegmentCache datasourceCache
= segmentMetadataCache.getDatasource(dataSource);
final SegmentMetadataTransaction cachedTransaction = new CachedSegmentMetadataTransaction(
sqlTransaction,
datasourceCache,
leaderSelector,
isCacheReadyForRead
);

if (isCacheReadyForRead) {
final boolean isSynced = segmentMetadataCache.isSyncedForRead();

if (isSynced) {
emitTransactionCount(Metric.READ_WRITE_TRANSACTIONS, dataSource);
} else {
log.warn(
Expand All @@ -142,7 +132,11 @@ public <T> T inReadWriteDatasourceTransaction(
emitTransactionCount(Metric.WRITE_ONLY_TRANSACTIONS, dataSource);
}

return datasourceCache.write(() -> executeWriteAndClose(cachedTransaction, callback));
return segmentMetadataCache.writeCacheForDataSource(dataSource, dataSourceCache -> {
final SegmentMetadataTransaction cachedTransaction =
new CachedSegmentMetadataTransaction(sqlTransaction, dataSourceCache, leaderSelector, isSynced);
return executeWriteAndClose(cachedTransaction, callback);
});
} else {
return executeWriteAndClose(sqlTransaction, callback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,5 @@
*/
public interface DatasourceSegmentCache extends DatasourceSegmentMetadataWriter, DatasourceSegmentMetadataReader
{
/**
* Performs a thread-safe read action on the cache.
* Read actions can be concurrent with other reads but are mutually exclusive
* from other write actions.
*/
<T> T read(Action<T> action) throws Exception;

/**
* Performs a thread-safe write action on the cache.
* Write actions are mutually exclusive from other writes or reads.
*/
<T> T write(Action<T> action) throws Exception;

/**
* Represents a thread-safe read or write action performed on the cache within
* required locks.
*/
@FunctionalInterface
interface Action<T>
{
T perform() throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* In-memory cache for segments and pending segments of a single datasource.
*/
class HeapMemoryDatasourceSegmentCache extends ReadWriteCache
class HeapMemoryDatasourceSegmentCache extends ReadWriteCache implements AutoCloseable
{
private final String dataSource;

Expand All @@ -60,6 +61,13 @@ class HeapMemoryDatasourceSegmentCache extends ReadWriteCache
private final TreeMap<Interval, SegmentsInInterval> intervalToSegments
= new TreeMap<>(Comparators.intervalsByEndThenStart());

/**
* Number of transactions currently using this cache. This field is accessed
* without acquiring an explicit lock on this cache since the operations are
* always performed within a ConcurrentHashMap.compute() which is atomic.
*/
private final AtomicInteger references = new AtomicInteger(0);

HeapMemoryDatasourceSegmentCache(String dataSource)
{
super(true);
Expand All @@ -84,6 +92,30 @@ boolean isEmpty()
return withReadLock(intervalToSegments::isEmpty);
}

/**
* Acquires a reference to this cache, which must be closed in {@link #close()}
* after the transaction holding this reference has completed.
*/
void acquireReference()
{
references.incrementAndGet();
}

@Override
public void close()
{
references.decrementAndGet();
}

/**
* @return true if this cache is currently being used by a transaction and
* the number of {@link #references} is non-zero.
*/
boolean isBeingUsedByTransaction()
{
return references.get() > 0;
}

/**
* Checks if a record in the cache needs to be updated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -80,11 +81,7 @@
* For cache usage modes, see {@link UsageMode}.
* <p>
* The map {@link #datasourceToSegmentCache} contains the cache for each datasource.
* Items are only added to this map and never removed. This is to avoid handling
* race conditions where a thread has invoked {@link #getDatasource} but hasn't
* acquired a lock on the returned cache yet while another thread sees this cache
* as empty and cleans it up. The first thread would then end up using a stopped
* cache, resulting in errors.
* If the cache for a datasource is empty, the sync thread removes it from the map.
*/
@ThreadSafe
public class HeapMemorySegmentMetadataCache implements SegmentMetadataCache
Expand Down Expand Up @@ -251,12 +248,66 @@ public boolean isSyncedForRead()
}

@Override
public DatasourceSegmentCache getDatasource(String dataSource)
public <T> T readCacheForDataSource(String dataSource, Action<T> readAction)
{
verifyCacheIsUsableAndAwaitSync();
return getCacheForDatasource(dataSource);
try (final HeapMemoryDatasourceSegmentCache datasourceCache = getCacheWithReference(dataSource)) {
return datasourceCache.withReadLock(
() -> {
try {
return readAction.perform(datasourceCache);
}
catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
);
}
}

@Override
public <T> T writeCacheForDataSource(String dataSource, Action<T> writeAction)
{
verifyCacheIsUsableAndAwaitSync();
try (final HeapMemoryDatasourceSegmentCache datasourceCache = getCacheWithReference(dataSource)) {
return datasourceCache.withWriteLock(
() -> {
try {
return writeAction.perform(datasourceCache);
}
catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
);
}
}

/**
* Returns the (existing or new) cache instance for the given datasource and
* acquires a single reference to it, which must be closed after the cache
* has been read or updated.
*/
private HeapMemoryDatasourceSegmentCache getCacheWithReference(String dataSource)
{
return datasourceToSegmentCache.compute(
dataSource,
(ds, existingCache) -> {
final HeapMemoryDatasourceSegmentCache newCache
= existingCache == null ? new HeapMemoryDatasourceSegmentCache(ds) : existingCache;
newCache.acquireReference();
return newCache;
}
);
}

/**
* Returns the (existing or new) cache instance for the given datasource.
* Similar to {@link #getCacheWithReference} but does not acquire references
* that need to be closed. This method should be called only by the sync thread.
*/
private HeapMemoryDatasourceSegmentCache getCacheForDatasource(String dataSource)
{
return datasourceToSegmentCache.computeIfAbsent(dataSource, HeapMemoryDatasourceSegmentCache::new);
Expand Down Expand Up @@ -460,16 +511,36 @@ private long syncWithMetadataStore()
*/
private void markCacheSynced()
{
datasourceToSegmentCache.forEach((dataSource, cache) -> {
final Set<String> cachedDatasources = Set.copyOf(datasourceToSegmentCache.keySet());

for (String dataSource : cachedDatasources) {
final HeapMemoryDatasourceSegmentCache cache = datasourceToSegmentCache.getOrDefault(
dataSource,
new HeapMemoryDatasourceSegmentCache(dataSource)
);
final CacheStats stats = cache.markCacheSynced();

if (!cache.isEmpty()) {
if (cache.isEmpty()) {
// If the cache is empty and not currently in use, remove it from the map
datasourceToSegmentCache.compute(
dataSource,
(ds, existingCache) -> {
if (existingCache != null && existingCache.isEmpty()
&& !existingCache.isBeingUsedByTransaction()) {
emitMetric(dataSource, Metric.DELETED_DATASOURCES, 1L);
return null;
} else {
return existingCache;
}
}
);
} else {
emitMetric(dataSource, Metric.CACHED_INTERVALS, stats.getNumIntervals());
emitMetric(dataSource, Metric.CACHED_USED_SEGMENTS, stats.getNumUsedSegments());
emitMetric(dataSource, Metric.CACHED_UNUSED_SEGMENTS, stats.getNumUnusedSegments());
emitMetric(dataSource, Metric.CACHED_PENDING_SEGMENTS, stats.getNumPendingSegments());
}
});
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ private Metric()

// CACHE UPDATE METRICS

/**
* Total number of segments deleted from the cache in the latest sync.
*/
public static final String DELETED_DATASOURCES = METRIC_NAME_PREFIX + "dataSource/deleted";

/**
* Number of segments which are now stale in the cache and need to be refreshed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ public boolean isSyncedForRead()
}

@Override
public DatasourceSegmentCache getDatasource(String dataSource)
public <T> T readCacheForDataSource(String dataSource, Action<T> readAction)
{
throw new UnsupportedOperationException();
}

@Override
public <T> T writeCacheForDataSource(String dataSource, Action<T> writeAction)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,32 +80,6 @@ public <T> T withReadLock(Supplier<T> action)
}
}

@Override
public <T> T read(DatasourceSegmentCache.Action<T> action) throws Exception
{
stateLock.readLock().lock();
try {
verifyCacheIsNotStopped();
return action.perform();
}
finally {
stateLock.readLock().unlock();
}
}

@Override
public <T> T write(DatasourceSegmentCache.Action<T> action) throws Exception
{
stateLock.writeLock().lock();
try {
verifyCacheIsNotStopped();
return action.perform();
}
finally {
stateLock.writeLock().unlock();
}
}

private void verifyCacheIsNotStopped()
{
if (isStopped) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,28 @@ public interface SegmentMetadataCache
boolean isSyncedForRead();

/**
* Returns the cache for the given datasource.
* Performs a thread-safe read action on the cache for the given datasource.
* Read actions can be concurrent with other reads but are mutually exclusive
* from other write actions on the same datasource.
*/
DatasourceSegmentCache getDatasource(String dataSource);
<T> T readCacheForDataSource(String dataSource, Action<T> readAction);

/**
* Performs a thread-safe write action on the cache for the given datasource.
* Write actions are mutually exclusive from other writes or reads on the same
* datasource.
*/
<T> T writeCacheForDataSource(String dataSource, Action<T> writeAction);

/**
* Represents a thread-safe read or write action performed on the cache within
* required locks.
*/
@FunctionalInterface
interface Action<T>
{
T perform(DatasourceSegmentCache dataSourceCache) throws Exception;
}

/**
* Cache usage modes.
Expand Down
Loading
Loading