diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 505cf002929a52..ecb6048406405c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -46,10 +46,10 @@ public class ConnectScheduler { private static final Logger LOG = LogManager.getLogger(ConnectScheduler.class); private int maxConnections; - private int numberConnection; + private AtomicInteger numberConnection; private AtomicInteger nextConnectionId; - private Map connectionMap = Maps.newConcurrentMap(); - private Map connByUser = Maps.newHashMap(); + private Map connectionMap = Maps.newConcurrentMap(); + private Map connByUser = Maps.newConcurrentMap(); private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool", true); // Use a thread to check whether connection is timeout. Because @@ -61,7 +61,7 @@ public class ConnectScheduler { public ConnectScheduler(int maxConnections) { this.maxConnections = maxConnections; - numberConnection = 0; + numberConnection = new AtomicInteger(0); nextConnectionId = new AtomicInteger(0); checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS); } @@ -70,10 +70,8 @@ private class TimeoutChecker extends TimerTask { @Override public void run() { long now = System.currentTimeMillis(); - synchronized (ConnectScheduler.this) { - for (ConnectContext connectContext : connectionMap.values()) { - connectContext.checkTimeout(now); - } + for (ConnectContext connectContext : connectionMap.values()) { + connectContext.checkTimeout(now); } } } @@ -96,50 +94,50 @@ public boolean submit(ConnectContext context) { } // Register one connection with its connection id. - public synchronized boolean registerConnection(ConnectContext ctx) { - if (numberConnection >= maxConnections) { + public boolean registerConnection(ConnectContext ctx) { + if (numberConnection.incrementAndGet() > maxConnections) { + numberConnection.decrementAndGet(); return false; } // Check user - if (connByUser.get(ctx.getQualifiedUser()) == null) { - connByUser.put(ctx.getQualifiedUser(), new AtomicInteger(0)); - } - int conns = connByUser.get(ctx.getQualifiedUser()).get(); + connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0)); + AtomicInteger conns = connByUser.get(ctx.getQualifiedUser()); if (ctx.getIsTempUser()) { - if (conns >= LdapAuthenticate.getMaxConn()) { + if (conns.incrementAndGet() > LdapAuthenticate.getMaxConn()) { + conns.decrementAndGet(); + numberConnection.decrementAndGet(); return false; } - } else if (conns >= ctx.getCatalog().getAuth().getMaxConn(ctx.getQualifiedUser())) { + } else if (conns.incrementAndGet() > ctx.getCatalog().getAuth().getMaxConn(ctx.getQualifiedUser())) { + conns.decrementAndGet(); + numberConnection.decrementAndGet(); return false; } - numberConnection++; - connByUser.get(ctx.getQualifiedUser()).incrementAndGet(); - connectionMap.put((long) ctx.getConnectionId(), ctx); + connectionMap.put(ctx.getConnectionId(), ctx); return true; } - public synchronized void unregisterConnection(ConnectContext ctx) { + public void unregisterConnection(ConnectContext ctx) { ctx.closeTxn(); - if (connectionMap.remove((long) ctx.getConnectionId()) != null) { - numberConnection--; + if (connectionMap.remove(ctx.getConnectionId()) != null) { AtomicInteger conns = connByUser.get(ctx.getQualifiedUser()); if (conns != null) { conns.decrementAndGet(); } + numberConnection.decrementAndGet(); } } - public synchronized ConnectContext getContext(long connectionId) { + public ConnectContext getContext(long connectionId) { return connectionMap.get(connectionId); } - public synchronized int getConnectionNum() { - return numberConnection; + public int getConnectionNum() { + return numberConnection.get(); } - public synchronized List listConnection(String user) { + public List listConnection(String user) { List infos = Lists.newArrayList(); - for (ConnectContext ctx : connectionMap.values()) { // Check auth if (!ctx.getQualifiedUser().equals(user) && diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 407bbea0404ee5..ae075a1d2ce103 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -471,8 +471,9 @@ public void exec() throws Exception { addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), queryOptions.query_timeout * 1000); - - LOG.info("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host); + if (LOG.isDebugEnabled()) { + LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host); + } if (topDataSink instanceof ResultFileSink && ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index c05bf008e7baff..449aa7fd63c4da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -81,7 +81,9 @@ public void registerQuery(TUniqueId queryId, Coordinator coord) throws UserExcep @Override public void registerQuery(TUniqueId queryId, QueryInfo info) throws UserException { - LOG.info("register query id = " + DebugUtil.printId(queryId) + ", job: " + info.getCoord().getJobId()); + if (LOG.isDebugEnabled()) { + LOG.debug("register query id = " + DebugUtil.printId(queryId) + ", job: " + info.getCoord().getJobId()); + } final QueryInfo result = coordinatorMap.putIfAbsent(queryId, info); if (result != null) { throw new UserException("queryId " + queryId + " already exists"); @@ -121,7 +123,9 @@ public Map getInstancesNumPerUser() { public void unregisterQuery(TUniqueId queryId) { QueryInfo queryInfo = coordinatorMap.remove(queryId); if (queryInfo != null) { - LOG.info("deregister query id {}", DebugUtil.printId(queryId)); + if (LOG.isDebugEnabled()) { + LOG.debug("deregister query id {}", DebugUtil.printId(queryId)); + } if (queryInfo.getConnectContext() != null && !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser()) ) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index a0d11fd075688e..0bad23f772e4d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -526,7 +526,9 @@ public void writeProfile(boolean isLastWriteProfile) { // Analyze one statement to structure in memory. public void analyze(TQueryOptions tQueryOptions) throws UserException { - LOG.info("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(), context.getForwardedStmtId()); + if (LOG.isDebugEnabled()) { + LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(), context.getForwardedStmtId()); + } parse();