Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions common/src/main/java/io/druid/concurrent/LifecycleLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ protected boolean tryReleaseShared(int ignore)
}
}

boolean isStarted()
{
return getState() == START_EXITED_SUCCESSFUL;
}

boolean awaitStarted()
{
try {
Expand All @@ -124,7 +129,7 @@ boolean awaitStarted()
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return getState() == START_EXITED_SUCCESSFUL;
return isStarted();
}

boolean awaitStarted(long timeNanos)
Expand All @@ -138,7 +143,7 @@ boolean awaitStarted(long timeNanos)
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return getState() == START_EXITED_SUCCESSFUL;
return isStarted();
}

@Override
Expand Down Expand Up @@ -210,6 +215,15 @@ public void exitStart()
sync.exitStart();
}

/**
* Returns {@code true} if {@link #started()} was called before that. Returns {@code false} if {@link #started()} is
* not called before {@link #exitStart()}, or if {@link #canStop()} is already called on this LifecycleLock.
*/
public boolean isStarted()
{
return sync.isStarted();
}

/**
* Awaits until {@link #exitStart()} is called, if needed, and returns {@code true} if {@link #started()} was called
* before that. Returns {@code false} if {@link #started()} is not called before {@link #exitStart()}, or if {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ public void stop()
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
lifecycleLock.exitStop();
}
}

@Override
Expand Down
187 changes: 80 additions & 107 deletions server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.DruidDataSource;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.concurrent.LifecycleLock;
import io.druid.guice.ManageLifecycle;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -82,20 +83,17 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
private static final EmittingLogger log = new EmittingLogger(SQLMetadataSegmentManager.class);


private final Object lock = new Object();
private final LifecycleLock lifecycleLock = new LifecycleLock();

private final ObjectMapper jsonMapper;
private final Supplier<MetadataSegmentManagerConfig> config;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSourcesRef;
private final SQLMetadataConnector connector;

private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null;

private volatile boolean started = false;

@Inject
public SQLMetadataSegmentManager(
ObjectMapper jsonMapper,
Expand All @@ -107,7 +105,7 @@ public SQLMetadataSegmentManager(
this.jsonMapper = jsonMapper;
this.config = config;
this.dbTables = dbTables;
this.dataSources = new AtomicReference<>(
this.dataSourcesRef = new AtomicReference<>(
new ConcurrentHashMap<String, DruidDataSource>()
);
this.connector = connector;
Expand All @@ -117,11 +115,11 @@ public SQLMetadataSegmentManager(
@LifecycleStart
public void start()
{
synchronized (lock) {
if (started) {
return;
}
if (!lifecycleLock.canStart()) {
return;
}

try {
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"));

final Duration delay = config.get().getPollDuration().toStandardDuration();
Expand All @@ -144,26 +142,35 @@ public void run()
delay.getMillis(),
TimeUnit.MILLISECONDS
);
started = true;
lifecycleLock.started();
}
finally {
lifecycleLock.exitStart();
}
}

@Override
@LifecycleStop
public void stop()
{
synchronized (lock) {
if (!started) {
return;
}
if (!lifecycleLock.canStop()) {
return;
}
try {
final ConcurrentHashMap<String, DruidDataSource> emptyMap = new ConcurrentHashMap<>();
ConcurrentHashMap<String, DruidDataSource> current;
do {
current = dataSourcesRef.get();
Copy link
Copy Markdown
Contributor

@licl2014 licl2014 Apr 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, there is no guarantee that dataSourcesRef is safe. it may be updated in for loop above. @jihoonson @gianm @jon-wei

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was refactored extensively in #6370 and then renamed in #7306 to SqlSegmentsMetadataManager. So this issue might not still exist. Have you been able to take a look at the new code?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which for loop are you referring to? Can you link here? Also, there has been a pretty big refactoring for this class. Do you still see the same issue?

} while (!dataSourcesRef.compareAndSet(current, emptyMap));

started = false;
dataSources.set(new ConcurrentHashMap<String, DruidDataSource>());
future.cancel(false);
future = null;
exec.shutdownNow();
exec = null;
}
finally {
lifecycleLock.exitStop();
}
}

@Override
Expand Down Expand Up @@ -302,31 +309,17 @@ public Void withHandle(Handle handle) throws Exception
public boolean removeDatasource(final String ds)
{
try {
ConcurrentHashMap<String, DruidDataSource> dataSourceMap = dataSources.get();
final int removed = connector.getDBI().withHandle(
handle -> handle.createStatement(
StringUtils.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable())
).bind("dataSource", ds).execute()
);

dataSourcesRef.get().remove(ds);

if (!dataSourceMap.containsKey(ds)) {
log.warn("Cannot delete datasource %s, does not exist", ds);
if (removed == 0) {
return false;
}

connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
StringUtils.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", getSegmentsTable())
)
.bind("dataSource", ds)
.execute();

return null;
}
}
);

dataSourceMap.remove(ds);
}
catch (Exception e) {
log.error(e, "Error removing datasource %s", ds);
Expand All @@ -340,34 +333,25 @@ public Void withHandle(Handle handle) throws Exception
public boolean removeSegment(String ds, final String segmentID)
{
try {
connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
StringUtils.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable())
).bind("segmentID", segmentID)
.execute();

return null;
}
}
final int removed = connector.getDBI().withHandle(
handle -> handle.createStatement(
StringUtils.format("UPDATE %s SET used=false WHERE id = :segmentID", getSegmentsTable())
).bind("segmentID", segmentID).execute()
);

ConcurrentHashMap<String, DruidDataSource> dataSourceMap = dataSources.get();

if (!dataSourceMap.containsKey(ds)) {
log.warn("Cannot find datasource %s", ds);
return false;
}
ConcurrentHashMap<String, DruidDataSource> dataSourceMap = dataSourcesRef.get();

DruidDataSource dataSource = dataSourceMap.get(ds);
dataSource.removePartition(segmentID);
if (dataSource != null) {
dataSource.removePartition(segmentID);

if (dataSource.isEmpty()) {
dataSourceMap.remove(ds);
if (dataSource.isEmpty()) {
dataSourceMap.remove(ds);
}
}

if (removed == 0) {
return false;
}
}
catch (Exception e) {
Expand All @@ -381,69 +365,59 @@ public Void withHandle(Handle handle) throws Exception
@Override
public boolean isStarted()
{
return started;
return lifecycleLock.isStarted();
}

@Override
public DruidDataSource getInventoryValue(String key)
{
return dataSources.get().get(key);
return dataSourcesRef.get().get(key);
}

@Override
public Collection<DruidDataSource> getInventory()
{
return dataSources.get().values();
return dataSourcesRef.get().values();
}

@Override
public Collection<String> getAllDatasourceNames()
{
synchronized (lock) {
return connector.getDBI().withHandle(
new HandleCallback<List<String>>()
{
@Override
public List<String> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
StringUtils.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable())
)
.fold(
Lists.<String>newArrayList(),
new Folder3<ArrayList<String>, Map<String, Object>>()
{
@Override
public ArrayList<String> fold(
ArrayList<String> druidDataSources,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
druidDataSources.add(
MapUtils.getString(stringObjectMap, "datasource")
);
return druidDataSources;
}
}
);

}
}
);
}
return connector.getDBI().withHandle(
handle -> handle.createQuery(
StringUtils.format("SELECT DISTINCT(datasource) FROM %s", getSegmentsTable())
)
.fold(
new ArrayList<>(),
new Folder3<List<String>, Map<String, Object>>()
{
@Override
public List<String> fold(
List<String> druidDataSources,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
druidDataSources.add(
MapUtils.getString(stringObjectMap, "datasource")
);
return druidDataSources;
}
}
)
);
}

@Override
public void poll()
{
try {
if (!started) {
if (!lifecycleLock.isStarted()) {
return;
}

ConcurrentHashMap<String, DruidDataSource> newDataSources = new ConcurrentHashMap<String, DruidDataSource>();
ConcurrentHashMap<String, DruidDataSource> newDataSources = new ConcurrentHashMap<>();

log.debug("Starting polling of segment table");

Expand Down Expand Up @@ -525,11 +499,10 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx)
}
}

synchronized (lock) {
if (started) {
dataSources.set(newDataSources);
}
}
ConcurrentHashMap<String, DruidDataSource> current;
do {
current = dataSourcesRef.get();
} while (!dataSourcesRef.compareAndSet(current, newDataSources));
}
catch (Exception e) {
log.makeAlert(e, "Problem polling DB.").emit();
Expand Down
Loading