Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
public class ConnectScheduler {
private static final Logger LOG = LogManager.getLogger(ConnectScheduler.class);
private int maxConnections;
private int numberConnection;
private AtomicInteger numberConnection;
private AtomicInteger nextConnectionId;
private Map<Long, ConnectContext> connectionMap = Maps.newConcurrentMap();
private Map<String, AtomicInteger> connByUser = Maps.newHashMap();
private Map<Integer, ConnectContext> connectionMap = Maps.newConcurrentMap();
private Map<String, AtomicInteger> connByUser = Maps.newConcurrentMap();
private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool", true);

// Use a thread to check whether connection is timeout. Because
Expand All @@ -61,7 +61,7 @@ public class ConnectScheduler {

public ConnectScheduler(int maxConnections) {
this.maxConnections = maxConnections;
numberConnection = 0;
numberConnection = new AtomicInteger(0);
nextConnectionId = new AtomicInteger(0);
checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS);
}
Expand All @@ -70,10 +70,8 @@ private class TimeoutChecker extends TimerTask {
@Override
public void run() {
long now = System.currentTimeMillis();
synchronized (ConnectScheduler.this) {
for (ConnectContext connectContext : connectionMap.values()) {
connectContext.checkTimeout(now);
}
for (ConnectContext connectContext : connectionMap.values()) {
connectContext.checkTimeout(now);
}
}
}
Expand All @@ -96,50 +94,50 @@ public boolean submit(ConnectContext context) {
}

// Register one connection with its connection id.
public synchronized boolean registerConnection(ConnectContext ctx) {
if (numberConnection >= maxConnections) {
public boolean registerConnection(ConnectContext ctx) {
if (numberConnection.incrementAndGet() > maxConnections) {
numberConnection.decrementAndGet();
return false;
}
// Check user
if (connByUser.get(ctx.getQualifiedUser()) == null) {
connByUser.put(ctx.getQualifiedUser(), new AtomicInteger(0));
}
int conns = connByUser.get(ctx.getQualifiedUser()).get();
connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0));
AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
if (ctx.getIsTempUser()) {
if (conns >= LdapAuthenticate.getMaxConn()) {
if (conns.incrementAndGet() > LdapAuthenticate.getMaxConn()) {
conns.decrementAndGet();
numberConnection.decrementAndGet();
return false;
}
} else if (conns >= ctx.getCatalog().getAuth().getMaxConn(ctx.getQualifiedUser())) {
} else if (conns.incrementAndGet() > ctx.getCatalog().getAuth().getMaxConn(ctx.getQualifiedUser())) {
conns.decrementAndGet();
numberConnection.decrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why call numberConnection.decrementAndGet(); here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why call numberConnection.decrementAndGet(); here?

we should reduce the connection number when connect failed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I miss it.

return false;
}
numberConnection++;
connByUser.get(ctx.getQualifiedUser()).incrementAndGet();
connectionMap.put((long) ctx.getConnectionId(), ctx);
connectionMap.put(ctx.getConnectionId(), ctx);
return true;
}

public synchronized void unregisterConnection(ConnectContext ctx) {
public void unregisterConnection(ConnectContext ctx) {
ctx.closeTxn();
if (connectionMap.remove((long) ctx.getConnectionId()) != null) {
numberConnection--;
if (connectionMap.remove(ctx.getConnectionId()) != null) {
AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
if (conns != null) {
conns.decrementAndGet();
}
numberConnection.decrementAndGet();
}
}

public synchronized ConnectContext getContext(long connectionId) {
public ConnectContext getContext(long connectionId) {
return connectionMap.get(connectionId);
}

public synchronized int getConnectionNum() {
return numberConnection;
public int getConnectionNum() {
return numberConnection.get();
}

public synchronized List<ConnectContext.ThreadInfo> listConnection(String user) {
public List<ConnectContext.ThreadInfo> listConnection(String user) {
List<ConnectContext.ThreadInfo> infos = Lists.newArrayList();

for (ConnectContext ctx : connectionMap.values()) {
// Check auth
if (!ctx.getQualifiedUser().equals(user) &&
Expand Down
5 changes: 3 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,9 @@ public void exec() throws Exception {
addressToBackendID.get(execBeAddr),
toBrpcHost(execBeAddr),
queryOptions.query_timeout * 1000);

LOG.info("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
if (LOG.isDebugEnabled()) {
LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this log is useful when tracking a query. Does it really impact the performance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the flame graph and the statistic provided above,for most users,query performance is the most important thing

}

if (topDataSink instanceof ResultFileSink
&& ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public void registerQuery(TUniqueId queryId, Coordinator coord) throws UserExcep

@Override
public void registerQuery(TUniqueId queryId, QueryInfo info) throws UserException {
LOG.info("register query id = " + DebugUtil.printId(queryId) + ", job: " + info.getCoord().getJobId());
if (LOG.isDebugEnabled()) {
LOG.debug("register query id = " + DebugUtil.printId(queryId) + ", job: " + info.getCoord().getJobId());
}
final QueryInfo result = coordinatorMap.putIfAbsent(queryId, info);
if (result != null) {
throw new UserException("queryId " + queryId + " already exists");
Expand Down Expand Up @@ -121,7 +123,9 @@ public Map<String, Integer> getInstancesNumPerUser() {
public void unregisterQuery(TUniqueId queryId) {
QueryInfo queryInfo = coordinatorMap.remove(queryId);
if (queryInfo != null) {
LOG.info("deregister query id {}", DebugUtil.printId(queryId));
if (LOG.isDebugEnabled()) {
LOG.debug("deregister query id {}", DebugUtil.printId(queryId));
}
if (queryInfo.getConnectContext() != null &&
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,9 @@ public void writeProfile(boolean isLastWriteProfile) {

// Analyze one statement to structure in memory.
public void analyze(TQueryOptions tQueryOptions) throws UserException {
LOG.info("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(), context.getForwardedStmtId());
if (LOG.isDebugEnabled()) {
LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(), context.getForwardedStmtId());
}

parse();

Expand Down