Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class AvaticaServerConfig
public int maxStatementsPerConnection = 4;

@JsonProperty
public Period connectionIdleTimeout = new Period("PT30M");
public Period connectionIdleTimeout = new Period("PT5M");

public int getMaxConnections()
{
Expand Down
102 changes: 91 additions & 11 deletions sql/src/main/java/io/druid/sql/avatica/DruidConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,123 @@

package io.druid.sql.avatica;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;

import javax.annotation.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* Connection tracking for {@link DruidMeta}. Not thread-safe.
* Connection tracking for {@link DruidMeta}. Thread-safe.
*/
public class DruidConnection
{
private final Map<String, Object> context;
private static final Logger log = new Logger(DruidConnection.class);

private final String connectionId;
private final int maxStatements;
private final ImmutableMap<String, Object> context;
private final AtomicInteger statementCounter = new AtomicInteger();
private final AtomicReference<Future<?>> timeoutFuture = new AtomicReference<>();

@GuardedBy("statements")
private final Map<Integer, DruidStatement> statements;
private Future<?> timeoutFuture;

public DruidConnection(final Map<String, Object> context)
@GuardedBy("statements")
private boolean open = true;

public DruidConnection(final String connectionId, final int maxStatements, final Map<String, Object> context)
{
this.connectionId = Preconditions.checkNotNull(connectionId);
this.maxStatements = maxStatements;
this.context = ImmutableMap.copyOf(context);
this.statements = new HashMap<>();
}

public Map<String, Object> context()
public DruidStatement createStatement()
{
final int statementId = statementCounter.incrementAndGet();

synchronized (statements) {
if (statements.containsKey(statementId)) {
// Will only happen if statementCounter rolls over before old statements are cleaned up. If this
// ever happens then something fishy is going on, because we shouldn't have billions of statements.
throw new ISE("Uh oh, too many statements");
}

if (statements.size() >= maxStatements) {
throw new ISE("Too many open statements, limit is[%,d]", maxStatements);
}

final DruidStatement statement = new DruidStatement(connectionId, statementId, context, () -> {
// onClose function for the statement
synchronized (statements) {
log.debug("Connection[%s] closed statement[%s].", connectionId, statementId);
statements.remove(statementId);
}
});

statements.put(statementId, statement);
log.debug("Connection[%s] opened statement[%s].", connectionId, statementId);
return statement;
}
}

public DruidStatement getStatement(final int statementId)
{
return context;
synchronized (statements) {
return statements.get(statementId);
}
}

public Map<Integer, DruidStatement> statements()
/**
* Closes this connection if it has no statements.
*
* @return true if closed
*/
public boolean closeIfEmpty()
{
return statements;
synchronized (statements) {
if (statements.isEmpty()) {
close();
return true;
} else {
return false;
}
}
}

public void close()
{
synchronized (statements) {
// Copy statements before iterating because statement.close() modifies it.
for (DruidStatement statement : ImmutableList.copyOf(statements.values())) {
try {
statement.close();
}
catch (Exception e) {
log.warn("Connection[%s] failed to close statement[%s]!", connectionId, statement.getStatementId());
}
}

log.debug("Connection[%s] closed.", connectionId);
open = false;
}
}

public DruidConnection sync(final Future<?> newTimeoutFuture)
{
if (timeoutFuture != null) {
timeoutFuture.cancel(false);
final Future<?> oldFuture = timeoutFuture.getAndSet(newTimeoutFuture);
if (oldFuture != null) {
oldFuture.cancel(false);
}
timeoutFuture = newTimeoutFuture;
return this;
}
}
Loading