From 5138474b20f4c482e7ed6678f89ff86704d47c05 Mon Sep 17 00:00:00 2001 From: Akshat Mardia Date: Fri, 28 Mar 2025 19:59:26 -0700 Subject: [PATCH 1/8] Changed deprecated BrokerClient --- .../apache/druid/msq/exec/SegmentLoadStatusFetcher.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java index d4eaef600125..e84f1745e991 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java @@ -27,17 +27,15 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.discovery.BrokerClient; +import org.apache.druid.sql.client.BrokerClient; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.http.client.Request; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.timeline.DataSegment; -import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -238,13 +236,11 @@ private void updateStatus(State state, DateTime startTime) */ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception { - Request request = brokerClient.makeRequest(HttpMethod.POST, "/druid/v2/sql/"); SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, datasource, versionsConditionString), ResultFormat.OBJECTLINES, false, false, false, null, null ); - request.setContent(MediaType.APPLICATION_JSON, objectMapper.writeValueAsBytes(sqlQuery)); - String response = brokerClient.sendQuery(request); + String response = brokerClient.submit(sqlQuery); if (response == null) { // Unable to query broker From f11a3e92c803196ff378c36fa80b50601f959d0a Mon Sep 17 00:00:00 2001 From: Akshat Mardia Date: Sat, 29 Mar 2025 17:46:27 -0700 Subject: [PATCH 2/8] Fixed ClientSqlQuery --- .../msq/exec/SegmentLoadStatusFetcher.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java index e84f1745e991..33be406e9452 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java @@ -26,15 +26,17 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.sql.client.BrokerClient; +import org.apache.druid.client.broker.BrokerClientImpl; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.sql.http.ResultFormat; -import org.apache.druid.sql.http.SqlQuery; +import org.apache.druid.query.http.ClientSqlQuery; +import org.apache.druid.query.http.SqlTaskStatus; import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -86,7 +88,7 @@ public class SegmentLoadStatusFetcher implements AutoCloseable + "FROM sys.segments\n" + "WHERE datasource = '%s' AND is_overshadowed = 0 AND (%s)"; - private final BrokerClient brokerClient; + private final BrokerClientImpl brokerClient; private final ObjectMapper objectMapper; // Map of version vs latest load status. private final AtomicReference versionLoadStatusReference; @@ -100,7 +102,7 @@ public class SegmentLoadStatusFetcher implements AutoCloseable private final ListeningExecutorService executorService; public SegmentLoadStatusFetcher( - BrokerClient brokerClient, + BrokerClientImpl brokerClient, ObjectMapper objectMapper, String queryId, String datasource, @@ -236,20 +238,22 @@ private void updateStatus(State state, DateTime startTime) */ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception { - SqlQuery sqlQuery = new SqlQuery(StringUtils.format(LOAD_QUERY, datasource, versionsConditionString), - ResultFormat.OBJECTLINES, + ClientSqlQuery clientSqlQuery = new ClientSqlQuery(StringUtils.format(LOAD_QUERY, datasource, versionsConditionString), + ResultFormat.OBJECTLINES.contentType(), false, false, false, null, null ); - String response = brokerClient.submit(sqlQuery); + ListenableFuture response = brokerClient.submitSqlTask(clientSqlQuery); + SqlTaskStatus taskStatus = response.get(); + String taskId = taskStatus.getTaskId(); - if (response == null) { + if (taskId == null) { // Unable to query broker return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated); - } else if (response.trim().isEmpty()) { + } else if (taskId.trim().isEmpty()) { // If no segments are returned for a version, all segments have been dropped by a drop rule. return new VersionLoadStatus(0, 0, 0, 0, 0); } else { - return objectMapper.readValue(response, VersionLoadStatus.class); + return objectMapper.readValue(taskId, VersionLoadStatus.class); } } From b79bd8b48c643bc3de7109b6d0f12dd93a2da2d5 Mon Sep 17 00:00:00 2001 From: Akshat Mardia Date: Mon, 31 Mar 2025 15:24:29 -0700 Subject: [PATCH 3/8] Fixed SegmentLoadStatusFetcher --- .../apache/druid/msq/exec/ControllerImpl.java | 2 +- .../msq/exec/SegmentLoadStatusFetcher.java | 18 +++-- .../exec/SegmentLoadStatusFetcherTest.java | 76 ++++++++++++++----- 3 files changed, 68 insertions(+), 28 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index cc73ea42c1a4..f890a9e9ac0f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -34,10 +34,10 @@ import it.unimi.dsi.fastutil.ints.IntArraySet; import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.client.broker.BrokerClient; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.data.input.StringTuple; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.discovery.BrokerClient; import org.apache.druid.error.DruidException; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; import org.apache.druid.frame.channel.ReadableConcatFrameChannel; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java index 33be406e9452..99c3a0187b84 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java @@ -24,25 +24,25 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.broker.BrokerClient; import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.client.broker.BrokerClientImpl; +import org.apache.druid.indexer.TaskState; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.query.http.SqlTaskStatus; +import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; -import javax.ws.rs.core.MediaType; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -88,7 +88,7 @@ public class SegmentLoadStatusFetcher implements AutoCloseable + "FROM sys.segments\n" + "WHERE datasource = '%s' AND is_overshadowed = 0 AND (%s)"; - private final BrokerClientImpl brokerClient; + private final BrokerClient brokerClient; private final ObjectMapper objectMapper; // Map of version vs latest load status. private final AtomicReference versionLoadStatusReference; @@ -102,7 +102,7 @@ public class SegmentLoadStatusFetcher implements AutoCloseable private final ListeningExecutorService executorService; public SegmentLoadStatusFetcher( - BrokerClientImpl brokerClient, + BrokerClient brokerClient, ObjectMapper objectMapper, String queryId, String datasource, @@ -244,6 +244,12 @@ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception ); ListenableFuture response = brokerClient.submitSqlTask(clientSqlQuery); SqlTaskStatus taskStatus = response.get(); + if (taskStatus == null || taskStatus.getState() != TaskState.SUCCESS) { + // Unable to query broker or task failed + log.warn("Failed to get load status from broker, task state: %s", + taskStatus != null ? taskStatus.getState() : "null"); + return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated); + } String taskId = taskStatus.getTaskId(); if (taskId == null) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java index 548a7ac473e9..46a0473c30aa 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java @@ -20,9 +20,12 @@ package org.apache.druid.msq.exec; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.discovery.BrokerClient; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.broker.BrokerClient; +import org.apache.druid.indexer.TaskState; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.query.http.ClientSqlQuery; +import org.apache.druid.query.http.SqlTaskStatus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.junit.Assert; @@ -34,12 +37,12 @@ import java.util.stream.IntStream; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class SegmentLoadStatusFetcherTest { @@ -55,15 +58,17 @@ public class SegmentLoadStatusFetcherTest @Test public void testSingleVersionWaitsForLoadCorrectly() throws Exception { + @SuppressWarnings("unchecked") + ListenableFuture mockFuture = mock(ListenableFuture.class); brokerClient = mock(BrokerClient.class); - doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString()); - doAnswer(new Answer() + doReturn(mock(ListenableFuture.class)).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); + doAnswer(new Answer>() { int timesInvoked = 0; @Override - public String answer(InvocationOnMock invocation) throws Throwable + public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { timesInvoked += 1; SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus( @@ -73,9 +78,16 @@ public String answer(InvocationOnMock invocation) throws Throwable 5 - timesInvoked, 0 ); - return new ObjectMapper().writeValueAsString(loadStatus); + String jsonResponse = new ObjectMapper().writeValueAsString(loadStatus); + SqlTaskStatus taskStatus = new SqlTaskStatus( + jsonResponse, + timesInvoked >= 5 ? TaskState.SUCCESS : TaskState.RUNNING, + null + ); + when(mockFuture.get()).thenReturn(taskStatus); + return mockFuture; } - }).when(brokerClient).sendQuery(any()); + }).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, new ObjectMapper(), @@ -86,21 +98,23 @@ public String answer(InvocationOnMock invocation) throws Throwable ); segmentLoadWaiter.waitForSegmentsToLoad(); - verify(brokerClient, times(5)).sendQuery(any()); + verify(brokerClient, times(5)).submitSqlTask(any(ClientSqlQuery.class)); } @Test public void testMultipleVersionWaitsForLoadCorrectly() throws Exception { + @SuppressWarnings("unchecked") + ListenableFuture mockFuture = mock(ListenableFuture.class); brokerClient = mock(BrokerClient.class); - doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString()); - doAnswer(new Answer() + doReturn(mock(ListenableFuture.class)).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); + when(brokerClient.submitSqlTask(any(ClientSqlQuery.class))).thenAnswer(new Answer>() { int timesInvoked = 0; @Override - public String answer(InvocationOnMock invocation) throws Throwable + public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { timesInvoked += 1; SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus( @@ -110,9 +124,17 @@ public String answer(InvocationOnMock invocation) throws Throwable 5 - timesInvoked, 0 ); - return new ObjectMapper().writeValueAsString(loadStatus); + String jsonResponse = new ObjectMapper().writeValueAsString(loadStatus); + + SqlTaskStatus taskStatus = new SqlTaskStatus( + jsonResponse, + TaskState.SUCCESS, + null + ); + when(mockFuture.get()).thenReturn(taskStatus); + return mockFuture; } - }).when(brokerClient).sendQuery(any()); + }); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, new ObjectMapper(), @@ -123,22 +145,25 @@ public String answer(InvocationOnMock invocation) throws Throwable ); segmentLoadWaiter.waitForSegmentsToLoad(); - verify(brokerClient, times(5)).sendQuery(any()); + verify(brokerClient, times(5)).submitSqlTask(any(ClientSqlQuery.class)); } @Test public void triggerCancellationFromAnotherThread() throws Exception { + @SuppressWarnings("unchecked") + ListenableFuture mockFuture = mock(ListenableFuture.class); brokerClient = mock(BrokerClient.class); - doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString()); - doAnswer(new Answer() + + doReturn(mock(ListenableFuture.class)).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); + doAnswer(new Answer>() { int timesInvoked = 0; @Override - public String answer(InvocationOnMock invocation) throws Throwable + public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { - // sleeping broker call to simulate a long running query + // sleeping broker call to simulate a long-running query Thread.sleep(1000); timesInvoked++; SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus( @@ -148,9 +173,18 @@ public String answer(InvocationOnMock invocation) throws Throwable 5 - timesInvoked, 0 ); - return new ObjectMapper().writeValueAsString(loadStatus); + String jsonResponse = new ObjectMapper().writeValueAsString(loadStatus); + + SqlTaskStatus taskStatus = new SqlTaskStatus( + jsonResponse, + TaskState.SUCCESS, + null + ); + when(mockFuture.get()).thenReturn(taskStatus); + + return mockFuture; } - }).when(brokerClient).sendQuery(any()); + }).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, new ObjectMapper(), From 66d41aaf1b83d2d11f96010eee58b435448b8b52 Mon Sep 17 00:00:00 2001 From: Akshat Mardia Date: Tue, 1 Apr 2025 23:42:37 -0700 Subject: [PATCH 4/8] Fixed Do Not Mock ListenableFuture --- .../exec/SegmentLoadStatusFetcherTest.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java index 46a0473c30aa..f639076b22d7 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.exec; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.broker.BrokerClient; import org.apache.druid.indexer.TaskState; @@ -58,11 +59,11 @@ public class SegmentLoadStatusFetcherTest @Test public void testSingleVersionWaitsForLoadCorrectly() throws Exception { - @SuppressWarnings("unchecked") - ListenableFuture mockFuture = mock(ListenableFuture.class); brokerClient = mock(BrokerClient.class); - doReturn(mock(ListenableFuture.class)).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); + SqlTaskStatus dummyStatus = new SqlTaskStatus("dummy-id", TaskState.RUNNING, null); + when(brokerClient.submitSqlTask(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyStatus)); + doAnswer(new Answer>() { int timesInvoked = 0; @@ -84,8 +85,7 @@ public ListenableFuture answer(InvocationOnMock invocation) throw timesInvoked >= 5 ? TaskState.SUCCESS : TaskState.RUNNING, null ); - when(mockFuture.get()).thenReturn(taskStatus); - return mockFuture; + return Futures.immediateFuture(taskStatus); } }).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); segmentLoadWaiter = new SegmentLoadStatusFetcher( @@ -104,11 +104,11 @@ public ListenableFuture answer(InvocationOnMock invocation) throw @Test public void testMultipleVersionWaitsForLoadCorrectly() throws Exception { - @SuppressWarnings("unchecked") - ListenableFuture mockFuture = mock(ListenableFuture.class); brokerClient = mock(BrokerClient.class); - doReturn(mock(ListenableFuture.class)).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); + SqlTaskStatus dummyStatus = new SqlTaskStatus("dummy-id", TaskState.RUNNING, null); + when(brokerClient.submitSqlTask(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyStatus)); + when(brokerClient.submitSqlTask(any(ClientSqlQuery.class))).thenAnswer(new Answer>() { int timesInvoked = 0; @@ -131,8 +131,7 @@ public ListenableFuture answer(InvocationOnMock invocation) throw TaskState.SUCCESS, null ); - when(mockFuture.get()).thenReturn(taskStatus); - return mockFuture; + return Futures.immediateFuture(taskStatus); } }); segmentLoadWaiter = new SegmentLoadStatusFetcher( @@ -151,11 +150,11 @@ public ListenableFuture answer(InvocationOnMock invocation) throw @Test public void triggerCancellationFromAnotherThread() throws Exception { - @SuppressWarnings("unchecked") - ListenableFuture mockFuture = mock(ListenableFuture.class); brokerClient = mock(BrokerClient.class); - doReturn(mock(ListenableFuture.class)).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); + SqlTaskStatus dummyStatus = new SqlTaskStatus("dummy-id", TaskState.RUNNING, null); + when(brokerClient.submitSqlTask(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyStatus)); + doAnswer(new Answer>() { int timesInvoked = 0; @@ -180,9 +179,7 @@ public ListenableFuture answer(InvocationOnMock invocation) throw TaskState.SUCCESS, null ); - when(mockFuture.get()).thenReturn(taskStatus); - - return mockFuture; + return Futures.immediateFuture(taskStatus); } }).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); segmentLoadWaiter = new SegmentLoadStatusFetcher( From 8705b07b22b3e8124ee2f2964210e6e5071bf044 Mon Sep 17 00:00:00 2001 From: Akshat Mardia Date: Wed, 2 Apr 2025 17:52:03 -0700 Subject: [PATCH 5/8] Sql endpoint used instead of sql task --- .../msq/exec/SegmentLoadStatusFetcher.java | 19 ++---- .../exec/SegmentLoadStatusFetcherTest.java | 60 ++++++------------- .../druid/client/broker/BrokerClient.java | 5 ++ .../druid/client/broker/BrokerClientImpl.java | 14 ++++- 4 files changed, 42 insertions(+), 56 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java index 99c3a0187b84..c825793dac7e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java @@ -29,14 +29,12 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.client.broker.BrokerClient; import org.apache.druid.common.guava.FutureUtils; -import org.apache.druid.indexer.TaskState; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.http.ClientSqlQuery; -import org.apache.druid.query.http.SqlTaskStatus; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; @@ -242,24 +240,17 @@ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception ResultFormat.OBJECTLINES.contentType(), false, false, false, null, null ); - ListenableFuture response = brokerClient.submitSqlTask(clientSqlQuery); - SqlTaskStatus taskStatus = response.get(); - if (taskStatus == null || taskStatus.getState() != TaskState.SUCCESS) { - // Unable to query broker or task failed - log.warn("Failed to get load status from broker, task state: %s", - taskStatus != null ? taskStatus.getState() : "null"); - return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated); - } - String taskId = taskStatus.getTaskId(); + ListenableFuture futureResponse = brokerClient.submitSqlQuery(clientSqlQuery); + String response = futureResponse.get(); - if (taskId == null) { + if (response == null) { // Unable to query broker return new VersionLoadStatus(0, 0, 0, 0, totalSegmentsGenerated); - } else if (taskId.trim().isEmpty()) { + } else if (response.trim().isEmpty()) { // If no segments are returned for a version, all segments have been dropped by a drop rule. return new VersionLoadStatus(0, 0, 0, 0, 0); } else { - return objectMapper.readValue(taskId, VersionLoadStatus.class); + return objectMapper.readValue(response, VersionLoadStatus.class); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java index f639076b22d7..49ad07dbf788 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java @@ -23,10 +23,8 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.broker.BrokerClient; -import org.apache.druid.indexer.TaskState; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.http.ClientSqlQuery; -import org.apache.druid.query.http.SqlTaskStatus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.junit.Assert; @@ -39,7 +37,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -61,15 +58,14 @@ public void testSingleVersionWaitsForLoadCorrectly() throws Exception { brokerClient = mock(BrokerClient.class); - SqlTaskStatus dummyStatus = new SqlTaskStatus("dummy-id", TaskState.RUNNING, null); - when(brokerClient.submitSqlTask(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyStatus)); - - doAnswer(new Answer>() + String dummyString = ""; + when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyString)); + doAnswer(new Answer>() { int timesInvoked = 0; @Override - public ListenableFuture answer(InvocationOnMock invocation) throws Throwable + public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { timesInvoked += 1; SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus( @@ -80,14 +76,9 @@ public ListenableFuture answer(InvocationOnMock invocation) throw 0 ); String jsonResponse = new ObjectMapper().writeValueAsString(loadStatus); - SqlTaskStatus taskStatus = new SqlTaskStatus( - jsonResponse, - timesInvoked >= 5 ? TaskState.SUCCESS : TaskState.RUNNING, - null - ); - return Futures.immediateFuture(taskStatus); + return Futures.immediateFuture(jsonResponse); } - }).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); + }).when(brokerClient).submitSqlQuery(any(ClientSqlQuery.class)); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, new ObjectMapper(), @@ -98,7 +89,7 @@ public ListenableFuture answer(InvocationOnMock invocation) throw ); segmentLoadWaiter.waitForSegmentsToLoad(); - verify(brokerClient, times(5)).submitSqlTask(any(ClientSqlQuery.class)); + verify(brokerClient, times(5)).submitSqlQuery(any(ClientSqlQuery.class)); } @Test @@ -106,15 +97,14 @@ public void testMultipleVersionWaitsForLoadCorrectly() throws Exception { brokerClient = mock(BrokerClient.class); - SqlTaskStatus dummyStatus = new SqlTaskStatus("dummy-id", TaskState.RUNNING, null); - when(brokerClient.submitSqlTask(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyStatus)); - - when(brokerClient.submitSqlTask(any(ClientSqlQuery.class))).thenAnswer(new Answer>() + String dummyString = ""; + when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyString)); + when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenAnswer(new Answer>() { int timesInvoked = 0; @Override - public ListenableFuture answer(InvocationOnMock invocation) throws Throwable + public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { timesInvoked += 1; SegmentLoadStatusFetcher.VersionLoadStatus loadStatus = new SegmentLoadStatusFetcher.VersionLoadStatus( @@ -125,13 +115,7 @@ public ListenableFuture answer(InvocationOnMock invocation) throw 0 ); String jsonResponse = new ObjectMapper().writeValueAsString(loadStatus); - - SqlTaskStatus taskStatus = new SqlTaskStatus( - jsonResponse, - TaskState.SUCCESS, - null - ); - return Futures.immediateFuture(taskStatus); + return Futures.immediateFuture(jsonResponse); } }); segmentLoadWaiter = new SegmentLoadStatusFetcher( @@ -144,7 +128,7 @@ public ListenableFuture answer(InvocationOnMock invocation) throw ); segmentLoadWaiter.waitForSegmentsToLoad(); - verify(brokerClient, times(5)).submitSqlTask(any(ClientSqlQuery.class)); + verify(brokerClient, times(5)).submitSqlQuery(any(ClientSqlQuery.class)); } @Test @@ -152,15 +136,15 @@ public void triggerCancellationFromAnotherThread() throws Exception { brokerClient = mock(BrokerClient.class); - SqlTaskStatus dummyStatus = new SqlTaskStatus("dummy-id", TaskState.RUNNING, null); - when(brokerClient.submitSqlTask(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyStatus)); + String dummyString = ""; + when(brokerClient.submitSqlQuery(any(ClientSqlQuery.class))).thenReturn(Futures.immediateFuture(dummyString)); - doAnswer(new Answer>() + doAnswer(new Answer>() { int timesInvoked = 0; @Override - public ListenableFuture answer(InvocationOnMock invocation) throws Throwable + public ListenableFuture answer(InvocationOnMock invocation) throws Throwable { // sleeping broker call to simulate a long-running query Thread.sleep(1000); @@ -173,15 +157,9 @@ public ListenableFuture answer(InvocationOnMock invocation) throw 0 ); String jsonResponse = new ObjectMapper().writeValueAsString(loadStatus); - - SqlTaskStatus taskStatus = new SqlTaskStatus( - jsonResponse, - TaskState.SUCCESS, - null - ); - return Futures.immediateFuture(taskStatus); + return Futures.immediateFuture(jsonResponse); } - }).when(brokerClient).submitSqlTask(any(ClientSqlQuery.class)); + }).when(brokerClient).submitSqlQuery(any(ClientSqlQuery.class)); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, new ObjectMapper(), diff --git a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java index 611e6399ee69..ea370572c447 100644 --- a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java +++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java @@ -37,6 +37,11 @@ */ public interface BrokerClient { + /** + * Submit the given {@code sqlQuery} to the Broker's SQL query endpoint. + */ + ListenableFuture submitSqlQuery(ClientSqlQuery sqlQuery); + /** * Submit the given {@code sqlQuery} to the Broker's SQL task endpoint. */ diff --git a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java index 728a67401b2e..2f387dec0c2c 100644 --- a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java +++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java @@ -32,7 +32,6 @@ import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; import org.jboss.netty.handler.codec.http.HttpMethod; - import java.util.List; public class BrokerClientImpl implements BrokerClient @@ -46,6 +45,19 @@ public BrokerClientImpl(final ServiceClient client, final ObjectMapper jsonMappe this.jsonMapper = jsonMapper; } + @Override + public ListenableFuture submitSqlQuery(final ClientSqlQuery sqlQuery) + { + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/") + .jsonContent(jsonMapper, sqlQuery), + new BytesFullResponseHandler() + ), + holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), String.class) + ); + } + @Override public ListenableFuture submitSqlTask(final ClientSqlQuery sqlQuery) { From e5c1b173a17b5266f815c4313491f635dfa7ac85 Mon Sep 17 00:00:00 2001 From: Akshat Mardia Date: Wed, 2 Apr 2025 22:57:01 -0700 Subject: [PATCH 6/8] Style changes --- .../druid/msq/exec/SegmentLoadStatusFetcher.java | 11 +++++------ .../msq/exec/SegmentLoadStatusFetcherTest.java | 14 ++++++++------ .../druid/client/broker/BrokerClientImpl.java | 8 ++++++-- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java index c825793dac7e..814f4d8a63db 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcher.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.client.broker.BrokerClient; @@ -236,12 +235,12 @@ private void updateStatus(State state, DateTime startTime) */ private VersionLoadStatus fetchLoadStatusFromBroker() throws Exception { - ClientSqlQuery clientSqlQuery = new ClientSqlQuery(StringUtils.format(LOAD_QUERY, datasource, versionsConditionString), - ResultFormat.OBJECTLINES.contentType(), - false, false, false, null, null + ClientSqlQuery clientSqlQuery = new ClientSqlQuery( + StringUtils.format(LOAD_QUERY, datasource, versionsConditionString), + ResultFormat.OBJECTLINES.contentType(), + false, false, false, null, null ); - ListenableFuture futureResponse = brokerClient.submitSqlQuery(clientSqlQuery); - String response = futureResponse.get(); + final String response = FutureUtils.get(brokerClient.submitSqlQuery(clientSqlQuery), true); if (response == null) { // Unable to query broker diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java index 49ad07dbf788..fdebcc7a655f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/SegmentLoadStatusFetcherTest.java @@ -46,6 +46,8 @@ public class SegmentLoadStatusFetcherTest { private static final String TEST_DATASOURCE = "testDatasource"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private SegmentLoadStatusFetcher segmentLoadWaiter; private BrokerClient brokerClient; @@ -75,13 +77,13 @@ public ListenableFuture answer(InvocationOnMock invocation) throws Throw 5 - timesInvoked, 0 ); - String jsonResponse = new ObjectMapper().writeValueAsString(loadStatus); + String jsonResponse = OBJECT_MAPPER.writeValueAsString(loadStatus); return Futures.immediateFuture(jsonResponse); } }).when(brokerClient).submitSqlQuery(any(ClientSqlQuery.class)); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, - new ObjectMapper(), + OBJECT_MAPPER, "id", TEST_DATASOURCE, IntStream.range(0, 5).boxed().map(partitionNum -> createTestDataSegment("version1", partitionNum)).collect(Collectors.toSet()), @@ -114,13 +116,13 @@ public ListenableFuture answer(InvocationOnMock invocation) throws Throw 5 - timesInvoked, 0 ); - String jsonResponse = new ObjectMapper().writeValueAsString(loadStatus); + String jsonResponse = OBJECT_MAPPER.writeValueAsString(loadStatus); return Futures.immediateFuture(jsonResponse); } }); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, - new ObjectMapper(), + OBJECT_MAPPER, "id", TEST_DATASOURCE, IntStream.range(0, 5).boxed().map(partitionNum -> createTestDataSegment("version1", partitionNum)).collect(Collectors.toSet()), @@ -156,13 +158,13 @@ public ListenableFuture answer(InvocationOnMock invocation) throws Throw 5 - timesInvoked, 0 ); - String jsonResponse = new ObjectMapper().writeValueAsString(loadStatus); + String jsonResponse = OBJECT_MAPPER.writeValueAsString(loadStatus); return Futures.immediateFuture(jsonResponse); } }).when(brokerClient).submitSqlQuery(any(ClientSqlQuery.class)); segmentLoadWaiter = new SegmentLoadStatusFetcher( brokerClient, - new ObjectMapper(), + OBJECT_MAPPER, "id", TEST_DATASOURCE, IntStream.range(0, 5).boxed().map(partitionNum -> createTestDataSegment("version1", partitionNum)).collect(Collectors.toSet()), diff --git a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java index 2f387dec0c2c..5ad609147429 100644 --- a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java +++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java @@ -26,12 +26,16 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; +import org.apache.druid.java.util.http.client.response.FullResponseHolder; +import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; import org.apache.druid.query.explain.ExplainPlan; import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.query.http.SqlTaskStatus; import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; import org.jboss.netty.handler.codec.http.HttpMethod; + +import java.nio.charset.StandardCharsets; import java.util.List; public class BrokerClientImpl implements BrokerClient @@ -52,9 +56,9 @@ public ListenableFuture submitSqlQuery(final ClientSqlQuery sqlQuery) client.asyncRequest( new RequestBuilder(HttpMethod.POST, "/druid/v2/sql/") .jsonContent(jsonMapper, sqlQuery), - new BytesFullResponseHandler() + new StringFullResponseHandler(StandardCharsets.UTF_8) ), - holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), String.class) + FullResponseHolder::getContent ); } From 3a8940894d6ecca704b9100df5cead486ff67834 Mon Sep 17 00:00:00 2001 From: Akshat Mardia Date: Thu, 3 Apr 2025 02:19:43 -0700 Subject: [PATCH 7/8] Deleted deprecated BrokerClient --- .../apache/druid/msq/test/MSQTestBase.java | 7 +- .../apache/druid/discovery/BrokerClient.java | 127 ------------ .../druid/discovery/BrokerClientTest.java | 182 ------------------ 3 files changed, 4 insertions(+), 312 deletions(-) delete mode 100644 server/src/main/java/org/apache/druid/discovery/BrokerClient.java delete mode 100644 server/src/test/java/org/apache/druid/discovery/BrokerClientTest.java diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 951e08d35a3d..5f1731008052 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -38,12 +38,12 @@ import com.google.inject.util.Providers; import org.apache.calcite.avatica.remote.TypedValue; import org.apache.druid.client.ImmutableSegmentLoadInfo; +import org.apache.druid.client.broker.BrokerClient; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; -import org.apache.druid.discovery.BrokerClient; import org.apache.druid.discovery.NodeRole; import org.apache.druid.frame.channel.FrameChannelSequence; import org.apache.druid.frame.processor.Bouncer; @@ -133,6 +133,7 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CursorFactory; @@ -247,7 +248,7 @@ import static org.mockito.Mockito.mock; /** - * Base test runner for running MSQ unit tests. It sets up multi stage query execution environment + * Base test runner for running MSQ unit tests. It sets up multi-stage query execution environment * and populates data for the datasources. The runner does not go via the HTTP layer for communication between the * various MSQ processes. *

@@ -551,7 +552,7 @@ public String getFormatString() objectMapper.registerModules(sqlModule.getJacksonModules()); objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList()); - doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString()); + doReturn(mock(Request.class)).when(brokerClient).submitSqlQuery(any(ClientSqlQuery.class)); testTaskActionClient = Mockito.spy(new MSQTestTaskActionClient(objectMapper, injector)); indexingServiceClient = new MSQTestOverlordServiceClient( diff --git a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java b/server/src/main/java/org/apache/druid/discovery/BrokerClient.java deleted file mode 100644 index a0ddbf42bed8..000000000000 --- a/server/src/main/java/org/apache/druid/discovery/BrokerClient.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.discovery; - -import com.google.inject.Inject; -import org.apache.druid.error.DruidException; -import org.apache.druid.guice.annotations.EscalatedGlobal; -import org.apache.druid.java.util.common.IOE; -import org.apache.druid.java.util.common.RetryUtils; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; -import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; -import org.apache.druid.rpc.ServiceClient; -import org.jboss.netty.channel.ChannelException; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.ExecutionException; - -/** - * This class facilitates interaction with Broker. - * Note that this should be removed and reconciled with org.apache.druid.sql.client.BrokerClient, which has the - * built-in functionality of {@link ServiceClient}, and proper Guice and service discovery wired in. - */ -@Deprecated -public class BrokerClient -{ - private static final int MAX_RETRIES = 5; - - private final HttpClient brokerHttpClient; - private final DruidNodeDiscovery druidNodeDiscovery; - - @Inject - public BrokerClient( - @EscalatedGlobal HttpClient brokerHttpClient, - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider - ) - { - this.brokerHttpClient = brokerHttpClient; - this.druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER); - } - - /** - * Creates and returns a {@link Request} after choosing a broker. - */ - public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException - { - String host = ClientUtils.pickOneHost(druidNodeDiscovery); - - if (host == null) { - throw DruidException.forPersona(DruidException.Persona.ADMIN) - .ofCategory(DruidException.Category.NOT_FOUND) - .build("A leader node could not be found for [%s] service. Check the logs to validate that service is healthy.", NodeRole.BROKER); - } - return new Request(httpMethod, new URL(StringUtils.format("%s%s", host, urlPath))); - } - - public String sendQuery(final Request request) throws Exception - { - return RetryUtils.retry( - () -> { - Request newRequestUrl = getNewRequestUrl(request); - final StringFullResponseHolder fullResponseHolder = brokerHttpClient.go(newRequestUrl, new StringFullResponseHandler(StandardCharsets.UTF_8)).get(); - - HttpResponseStatus responseStatus = fullResponseHolder.getResponse().getStatus(); - if (HttpResponseStatus.SERVICE_UNAVAILABLE.equals(responseStatus) - || HttpResponseStatus.GATEWAY_TIMEOUT.equals(responseStatus)) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build("Request to broker failed due to failed response status: [%s]", responseStatus); - } - return fullResponseHolder.getContent(); - }, - (throwable) -> { - if (throwable instanceof ExecutionException) { - return throwable.getCause() instanceof IOException || throwable.getCause() instanceof ChannelException; - } - if (throwable instanceof DruidException) { - return ((DruidException) throwable).getCategory() == DruidException.Category.RUNTIME_FAILURE; - } - return throwable instanceof IOE; - }, - MAX_RETRIES - ); - } - - private Request getNewRequestUrl(Request oldRequest) - { - try { - return ClientUtils.withUrl( - oldRequest, - new URL(StringUtils.format("%s%s", ClientUtils.pickOneHost(druidNodeDiscovery), oldRequest.getUrl().getPath())) - ); - } - catch (MalformedURLException e) { - // Not an IOException; this is our own fault. - throw DruidException.defensive( - "Failed to build url with path[%s] and query string [%s].", - oldRequest.getUrl().getPath(), - oldRequest.getUrl().getQuery() - ); - } - } -} diff --git a/server/src/test/java/org/apache/druid/discovery/BrokerClientTest.java b/server/src/test/java/org/apache/druid/discovery/BrokerClientTest.java deleted file mode 100644 index de03877a9b0c..000000000000 --- a/server/src/test/java/org/apache/druid/discovery/BrokerClientTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.discovery; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.name.Names; -import org.apache.druid.guice.GuiceInjectors; -import org.apache.druid.guice.Jerseys; -import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; -import org.apache.druid.guice.LifecycleModule; -import org.apache.druid.guice.annotations.Self; -import org.apache.druid.initialization.Initialization; -import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.server.DruidNode; -import org.apache.druid.server.initialization.BaseJettyTest; -import org.apache.druid.server.initialization.jetty.JettyServerInitializer; -import org.easymock.EasyMock; -import org.eclipse.jetty.server.Server; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.junit.Assert; -import org.junit.Test; - -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import java.nio.charset.StandardCharsets; - -public class BrokerClientTest extends BaseJettyTest -{ - private DiscoveryDruidNode discoveryDruidNode; - private HttpClient httpClient; - - @Override - protected Injector setupInjector() - { - final DruidNode node = new DruidNode("test", "localhost", false, null, null, true, false); - discoveryDruidNode = new DiscoveryDruidNode(node, NodeRole.BROKER, ImmutableMap.of()); - - Injector injector = Initialization.makeInjectorWithModules( - GuiceInjectors.makeStartupInjector(), ImmutableList.of( - binder -> { - JsonConfigProvider.bindInstance( - binder, - Key.get(DruidNode.class, Self.class), - node - ); - binder.bind(Integer.class).annotatedWith(Names.named("port")).toInstance(node.getPlaintextPort()); - binder.bind(JettyServerInitializer.class).to(DruidLeaderClientTest.TestJettyServerInitializer.class).in(LazySingleton.class); - Jerseys.addResource(binder, SimpleResource.class); - LifecycleModule.register(binder, Server.class); - } - ) - ); - httpClient = injector.getInstance(BaseJettyTest.ClientHolder.class).getClient(); - return injector; - } - - @Test - public void testSimple() throws Exception - { - DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(discoveryDruidNode)).anyTimes(); - - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER)).andReturn(druidNodeDiscovery); - - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); - - BrokerClient brokerClient = new BrokerClient( - httpClient, - druidNodeDiscoveryProvider - ); - - Request request = brokerClient.makeRequest(HttpMethod.POST, "/simple/direct"); - request.setContent("hello".getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals("hello", brokerClient.sendQuery(request)); - } - - @Test - public void testRetryableError() throws Exception - { - DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(discoveryDruidNode)).anyTimes(); - - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER)).andReturn(druidNodeDiscovery); - - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); - - BrokerClient brokerClient = new BrokerClient( - httpClient, - druidNodeDiscoveryProvider - ); - - Request request = brokerClient.makeRequest(HttpMethod.POST, "/simple/flakey"); - request.setContent("hello".getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals("hello", brokerClient.sendQuery(request)); - } - - @Test - public void testNonRetryableError() throws Exception - { - DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); - EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(discoveryDruidNode)).anyTimes(); - - DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER)).andReturn(druidNodeDiscovery); - - EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); - - BrokerClient brokerClient = new BrokerClient( - httpClient, - druidNodeDiscoveryProvider - ); - - Request request = brokerClient.makeRequest(HttpMethod.POST, "/simple/error"); - Assert.assertEquals("", brokerClient.sendQuery(request)); - } - - @Path("/simple") - public static class SimpleResource - { - private static int attempt = 0; - - @POST - @Path("/direct") - @Produces(MediaType.APPLICATION_JSON) - public Response direct(String input) - { - if ("hello".equals(input)) { - return Response.ok("hello").build(); - } else { - return Response.serverError().build(); - } - } - - @POST - @Path("/flakey") - @Produces(MediaType.APPLICATION_JSON) - public Response redirecting() - { - if (attempt > 2) { - return Response.ok("hello").build(); - } else { - attempt += 1; - return Response.status(504).build(); - } - } - - @POST - @Path("/error") - @Produces(MediaType.APPLICATION_JSON) - public Response error() - { - return Response.status(404).build(); - } - } -} From 92b64af0fa4e0995dde00ebd60239e0e14157599 Mon Sep 17 00:00:00 2001 From: Akshat Mardia Date: Thu, 3 Apr 2025 12:06:17 -0700 Subject: [PATCH 8/8] Safely deleteted BrokerClient --- .../test/java/org/apache/druid/msq/test/MSQTestBase.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 5f1731008052..08efce8e9106 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -38,7 +38,6 @@ import com.google.inject.util.Providers; import org.apache.calcite.avatica.remote.TypedValue; import org.apache.druid.client.ImmutableSegmentLoadInfo; -import org.apache.druid.client.broker.BrokerClient; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -75,7 +74,6 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.http.client.Request; import org.apache.druid.metadata.input.InputSourceModule; import org.apache.druid.msq.counters.CounterNames; import org.apache.druid.msq.counters.CounterSnapshots; @@ -133,7 +131,6 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; -import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CursorFactory; @@ -438,7 +435,6 @@ public void setUp2() throws Exception segmentManager = new MSQTestSegmentManager(segmentCacheManager); - BrokerClient brokerClient = mock(BrokerClient.class); List modules = ImmutableList.of( binder -> { DruidProcessingConfig druidProcessingConfig = new DruidProcessingConfig() @@ -538,7 +534,6 @@ public String getFormatString() new LookylooModule(), new SegmentWranglerModule(), new HllSketchModule(), - binder -> binder.bind(BrokerClient.class).toInstance(brokerClient), binder -> binder.bind(Bouncer.class).toInstance(new Bouncer(1)) ); // adding node role injection to the modules, since CliPeon would also do that through run method @@ -552,8 +547,6 @@ public String getFormatString() objectMapper.registerModules(sqlModule.getJacksonModules()); objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList()); - doReturn(mock(Request.class)).when(brokerClient).submitSqlQuery(any(ClientSqlQuery.class)); - testTaskActionClient = Mockito.spy(new MSQTestTaskActionClient(objectMapper, injector)); indexingServiceClient = new MSQTestOverlordServiceClient( objectMapper,