Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1254,8 +1254,8 @@ These Historical configurations can be defined in the `historical/runtime.proper
|`druid.segmentCache.dropSegmentDelayMillis`|How long a process delays before completely dropping segment.|30000 (30 seconds)|
|`druid.segmentCache.infoDir`|Historical processes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from from deep storage.|10|
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|Same as numLoadingThreads|
|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores|
|`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2|

In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.utils.JvmUtils;
import org.hibernate.validator.constraints.NotEmpty;

import java.io.File;
Expand All @@ -46,7 +47,7 @@ public class SegmentLoaderConfig
private int announceIntervalMillis = 0; // do not background announce

@JsonProperty("numLoadingThreads")
private int numLoadingThreads = 10;
private int numLoadingThreads = JvmUtils.getRuntimeInfo().getAvailableProcessors();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? It doesn't seem likely to me that the segment loading will be CPU-bound (it's probably I/O bound, I'd guess).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, numLoading threads config wasn't being used. These threads also do the CPU intensive work of uncompressing the segment and memory mapping them. But you are right, the work is probably more I/O bound than CPU. The change I made was to provide more of a lower bound on minimum number of threads which should be safe for this pool. But I could be swayed to change it back to what it was. FWIW, in our internal setup, I have set the config to 2 * number of CPUs and it has been holding up fine.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be configurable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value is configurable. One can use the config druid.segmentCache.numLoadingThreads for this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation is not updated, it still says about 10 threads.

@samarthjain could you please update the documentation with a more elaborate discussion of how (and why) anyone would possibly want to change this configuration, and in which direction?


@JsonProperty("numBootstrapThreads")
private Integer numBootstrapThreads = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;

import java.io.IOException;
import java.util.concurrent.ExecutorService;

/**
* Use {@link org.apache.druid.server.coordinator.HttpLoadQueuePeon} for segment load/drops.
Expand All @@ -54,21 +56,27 @@ public class ZkCoordinator

private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false;
private final ExecutorService segmentLoadUnloadService;

@Inject
public ZkCoordinator(
SegmentLoadDropHandler loadDropHandler,
ObjectMapper jsonMapper,
ZkPathsConfig zkPaths,
DruidServerMetadata me,
CuratorFramework curator
CuratorFramework curator,
SegmentLoaderConfig config
)
{
this.dataSegmentChangeHandler = loadDropHandler;
this.jsonMapper = jsonMapper;
this.zkPaths = zkPaths;
this.me = me;
this.curator = curator;
this.segmentLoadUnloadService = Execs.multiThreaded(
config.getNumLoadingThreads(),
"ZKCoordinator--%d"
);
}

@LifecycleStart
Expand Down Expand Up @@ -102,63 +110,12 @@ public void start() throws IOException
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
{
final ChildData child = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
final String path = child.getPath();
final DataSegmentChangeRequest request = jsonMapper.readValue(
child.getData(), DataSegmentChangeRequest.class
);

log.info("New request[%s] with zNode[%s].", request.asString(), path);

try {
request.go(
dataSegmentChangeHandler,
new DataSegmentChangeCallback()
{
boolean hasRun = false;

@Override
public void execute()
{
try {
if (!hasRun) {
curator.delete().guaranteed().forPath(path);
log.info("Completed request [%s]", request.asString());
hasRun = true;
}
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.error(e, "Exception while removing zNode[%s]", path);
throw new RuntimeException(e);
}
}
}
);
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}

log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", request)
.emit();
}

childAdded(child);
break;
case CHILD_REMOVED:
log.info("zNode[%s] was removed", event.getData().getPath());
Expand All @@ -168,6 +125,7 @@ public void execute()
}
}
}

);
loadQueueCache.start();
}
Expand All @@ -180,6 +138,59 @@ public void execute()
}
}

private void childAdded(ChildData child)
{
segmentLoadUnloadService.submit(() -> {
final String path = child.getPath();
DataSegmentChangeRequest request = new SegmentChangeRequestNoop();
try {
final DataSegmentChangeRequest finalRequest = jsonMapper.readValue(
child.getData(),
DataSegmentChangeRequest.class
);

finalRequest.go(
dataSegmentChangeHandler,
new DataSegmentChangeCallback()
{
@Override
public void execute()
{
try {
curator.delete().guaranteed().forPath(path);
log.info("Completed request [%s]", finalRequest.asString());
}
catch (Exception e) {
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}
log.error(e, "Exception while removing zNode[%s]", path);
throw new RuntimeException(e);
}
}
}
);
}
catch (Exception e) {
// Something went wrong in either deserializing the request using jsonMapper or when invoking it
try {
curator.delete().guaranteed().forPath(path);
}
catch (Exception e1) {
log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
}

log.makeAlert(e, "Segment load/unload: uncaught exception.")
.addData("node", path)
.addData("nodeProperties", request)
.emit();
}
});
}

@LifecycleStop
public void stop()
{
Expand Down
Loading