Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. This option is deprecated. Use `completionTimeout` of KafkaSupervisorIOConfig instead.|no (default == 0)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|
|`workerThreads`|Integer|The number of threads that will be used by the supervisor for asynchronous operations.|no (default == min(10, taskCount))|
|`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))|
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.metamx.http.client.Request;
import com.metamx.http.client.response.FullResponseHandler;
import com.metamx.http.client.response.FullResponseHolder;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.indexing.common.RetryPolicy;
import io.druid.indexing.common.RetryPolicyConfig;
import io.druid.indexing.common.RetryPolicyFactory;
Expand All @@ -45,6 +44,7 @@
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.segment.realtime.firehose.ChatHandlerResource;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod;
Expand All @@ -58,6 +58,7 @@
import java.net.Socket;
import java.net.URI;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;

public class KafkaIndexTaskClient
Expand All @@ -84,6 +85,7 @@ public TaskNotRunnableException(String message)
private static final EmittingLogger log = new EmittingLogger(KafkaIndexTaskClient.class);
private static final String BASE_PATH = "/druid/worker/v1/chat";
private static final int TASK_MISMATCH_RETRY_DELAY_SECONDS = 5;
private static final TreeMap EMPTY_TREE_MAP = new TreeMap();

private final HttpClient httpClient;
private final ObjectMapper jsonMapper;
Expand Down Expand Up @@ -270,6 +272,33 @@ public Map<Integer, Long> getCurrentOffsets(final String id, final boolean retry
}
}

public TreeMap<Integer, Map<Integer, Long>> getCheckpoints(final String id, final boolean retry)
{
log.debug("GetCheckpoints task[%s] retry[%s]", id, retry);
try {
final FullResponseHolder response = submitRequest(id, HttpMethod.GET, "checkpoints", null, retry);
return jsonMapper.readValue(response.getContent(), new TypeReference<TreeMap<Integer, TreeMap<Integer, Long>>>()
{
});
}
catch (NoTaskLocationException e) {
return EMPTY_TREE_MAP;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}

public ListenableFuture<TreeMap<Integer, Map<Integer, Long>>> getCheckpointsAsync(
final String id,
final boolean retry
)
{
return executorService.submit(
() -> getCheckpoints(id, retry)
);
}

public Map<Integer, Long> getEndOffsets(final String id)
{
log.debug("GetEndOffsets task[%s]", id);
Expand All @@ -288,21 +317,21 @@ public Map<Integer, Long> getEndOffsets(final String id)
}
}

public boolean setEndOffsets(final String id, final Map<Integer, Long> endOffsets)
{
return setEndOffsets(id, endOffsets, false);
}

public boolean setEndOffsets(final String id, final Map<Integer, Long> endOffsets, final boolean resume)
public boolean setEndOffsets(
final String id,
final Map<Integer, Long> endOffsets,
final boolean resume,
final boolean finalize
)
{
log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s]", id, endOffsets, resume);
log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", id, endOffsets, resume, finalize);

try {
final FullResponseHolder response = submitRequest(
id,
HttpMethod.POST,
"offsets/end",
resume ? "resume=true" : null,
StringUtils.format("resume=%s&finish=%s", resume, finalize),
jsonMapper.writeValueAsBytes(endOffsets),
true
);
Expand Down Expand Up @@ -419,13 +448,8 @@ public Map<Integer, Long> call() throws Exception
);
}

public ListenableFuture<Boolean> setEndOffsetsAsync(final String id, final Map<Integer, Long> endOffsets)
{
return setEndOffsetsAsync(id, endOffsets, false);
}

public ListenableFuture<Boolean> setEndOffsetsAsync(
final String id, final Map<Integer, Long> endOffsets, final boolean resume
final String id, final Map<Integer, Long> endOffsets, final boolean resume, final boolean finalize
)
{
return executorService.submit(
Expand All @@ -434,7 +458,7 @@ public ListenableFuture<Boolean> setEndOffsetsAsync(
@Override
public Boolean call() throws Exception
{
return setEndOffsets(id, endOffsets, resume);
return setEndOffsets(id, endOffsets, resume, finalize);
}
}
);
Expand Down Expand Up @@ -483,7 +507,10 @@ private FullResponseHolder submitRequest(

Optional<TaskStatus> status = taskInfoProvider.getTaskStatus(id);
if (!status.isPresent() || !status.get().isRunnable()) {
throw new TaskNotRunnableException(StringUtils.format("Aborting request because task [%s] is not runnable", id));
throw new TaskNotRunnableException(StringUtils.format(
"Aborting request because task [%s] is not runnable",
id
));
}

String host = location.getHost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final int maxRowsPerSegment;
private final Period intermediatePersistPeriod;
private final File basePersistDirectory;
@Deprecated
private final int maxPendingPersists;
private final IndexSpec indexSpec;
private final boolean reportParseExceptions;
@Deprecated
private final long handoffConditionTimeout;
private final boolean resetOffsetAutomatically;

Expand Down Expand Up @@ -69,7 +69,7 @@ public KafkaTuningConfig(
? defaults.getIntermediatePersistPeriod()
: intermediatePersistPeriod;
this.basePersistDirectory = defaults.getBasePersistDirectory();
this.maxPendingPersists = maxPendingPersists == null ? defaults.getMaxPendingPersists() : maxPendingPersists;
this.maxPendingPersists = 0;
this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
this.reportParseExceptions = reportParseExceptions == null
? defaults.isReportParseExceptions()
Expand Down Expand Up @@ -127,6 +127,7 @@ public File getBasePersistDirectory()

@Override
@JsonProperty
@Deprecated
public int getMaxPendingPersists()
{
return maxPendingPersists;
Expand Down Expand Up @@ -156,7 +157,6 @@ public boolean isReportParseExceptions()
return reportParseExceptions;
}

@Deprecated
@JsonProperty
public long getHandoffConditionTimeout()
{
Expand Down
Loading