Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
Expand All @@ -41,9 +42,15 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutionException;

public class EventReceiverFirehoseTestClient
{
private static final Logger LOG = new Logger(EventReceiverFirehoseTestClient.class);

static final int NUM_RETRIES = 30;
static final long DELAY_FOR_RETRIES_MS = 10000;

private final String host;
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
Expand Down Expand Up @@ -82,31 +89,45 @@ private String getURL()
* @return
*/
public int postEvents(Collection<Map<String, Object>> events, ObjectMapper objectMapper, String mediaType)
throws InterruptedException
{
try {
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(getURL()))
.setContent(mediaType, objectMapper.writeValueAsBytes(events)),
StatusResponseHandler.getInstance()
).get();
int retryCount = 0;
while (true) {
try {
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(getURL()))
.setContent(mediaType, objectMapper.writeValueAsBytes(events)),
StatusResponseHandler.getInstance()
).get();

if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while posting events to url[%s] status[%s] content[%s]",
getURL(),
response.getStatus(),
response.getContent()
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while posting events to url[%s] status[%s] content[%s]",
getURL(),
response.getStatus(),
response.getContent()
);
}
Map<String, Integer> responseData = objectMapper.readValue(
response.getContent(), new TypeReference<Map<String, Integer>>()
{
}
);
return responseData.get("eventCount");
}
// adding retries to flaky tests using channels
catch (ExecutionException e) {
if (retryCount > NUM_RETRIES) {
throw new RuntimeException(e); //giving up now
} else {
LOG.info(e, "received exception, sleeping and retrying");
retryCount++;
Thread.sleep(DELAY_FOR_RETRIES_MS);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
Map<String, Integer> responseData = objectMapper.readValue(
response.getContent(), new TypeReference<Map<String, Integer>>()
{
}
);
return responseData.get("eventCount");
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class HttpUtil
private static final Logger LOG = new Logger(AbstractQueryResourceTestClient.class);
private static final StatusResponseHandler RESPONSE_HANDLER = StatusResponseHandler.getInstance();

static final int NUM_RETRIES = 30;
static final long DELAY_FOR_RETRIES_MS = 10000;

public static StatusResponseHolder makeRequest(HttpClient httpClient, HttpMethod method, String url, byte[] content)
{
return makeRequestWithExpectedStatus(
Expand Down Expand Up @@ -78,13 +81,13 @@ public static StatusResponseHolder makeRequestWithExpectedStatus(
response.getContent()
);
// it can take time for the auth config to propagate, so we retry
if (retryCount > 10) {
if (retryCount > NUM_RETRIES) {
throw new ISE(errMsg);
} else {
LOG.error(errMsg);
LOG.error("retrying in 3000ms, retryCount: " + retryCount);
retryCount++;
Thread.sleep(3000);
Thread.sleep(DELAY_FOR_RETRIES_MS);
}
} else {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
DateTime dtLast; // timestamp of last event
DateTime dtGroupBy; // timestamp for expected response for groupBy query

static final int NUM_RETRIES = 60;
static final long DELAY_FOR_RETRIES_MS = 10000;

@Inject
ServerDiscoveryFactory factory;
@Inject
Expand Down Expand Up @@ -106,11 +109,14 @@ void doTest()
postEvents();

// wait for a while to let the events be ingested
ITRetryUtil.retryUntilTrue(
ITRetryUtil.retryUntil(
() -> {
final int countRows = queryHelper.countRows(fullDatasourceName, Intervals.ETERNITY.toString());
return countRows == getNumExpectedRowsIngested();
},
true,
DELAY_FOR_RETRIES_MS,
NUM_RETRIES,
"Waiting all events are ingested"
);

Expand Down Expand Up @@ -152,8 +158,8 @@ void doTest()
ITRetryUtil.retryUntil(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
true,
10000,
60,
DELAY_FOR_RETRIES_MS,
NUM_RETRIES,
"Real-time generated segments loaded"
);

Expand Down