Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,27 +348,12 @@ public ListenableFuture<ChangeRequestsSnapshot<DataSegmentChangeRequest>> getSeg
synchronized (lock) {
Iterable<DataSegmentChangeRequest> segments = Iterables.transform(
segmentLookup.keySet(),
new Function<>()
{
@Nullable
@Override
public SegmentChangeRequestLoad apply(DataSegment input)
{
return new SegmentChangeRequestLoad(input);
}
}
SegmentChangeRequestLoad::new
);

Iterable<DataSegmentChangeRequest> sinkSchema = Iterables.transform(
taskSinkSchema.values(),
new Function<>()
{
@Override
public SegmentSchemasChangeRequest apply(SegmentSchemas input)
{
return new SegmentSchemasChangeRequest(input);
}
}
SegmentSchemasChangeRequest::new
);
Iterable<DataSegmentChangeRequest> changeRequestIterables = Iterables.concat(segments, sinkSchema);
SettableFuture<ChangeRequestsSnapshot<DataSegmentChangeRequest>> future = SettableFuture.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.server.http.SegmentLoadingMode;

import javax.annotation.Nullable;
import java.util.Objects;

/**
* Contains {@link State} of a {@link DataSegmentChangeRequest} and failure
Expand All @@ -39,23 +41,51 @@ public enum State
private final State state;
@Nullable
private final String failureCause;
private final SegmentLoadingMode loadingMode;

public static final SegmentChangeStatus SUCCESS = new SegmentChangeStatus(State.SUCCESS, null);
public static final SegmentChangeStatus PENDING = new SegmentChangeStatus(State.PENDING, null);
private static final SegmentChangeStatus SUCCESS = new SegmentChangeStatus(State.SUCCESS, null, null);
private static final SegmentChangeStatus PENDING = new SegmentChangeStatus(State.PENDING, null, null);

public static SegmentChangeStatus success()
{
return SUCCESS;
}

public static SegmentChangeStatus success(SegmentLoadingMode loadingMode)
{
return new SegmentChangeStatus(State.SUCCESS, null, loadingMode);
}

public static SegmentChangeStatus pending()
{
return PENDING;
}

public static SegmentChangeStatus pending(SegmentLoadingMode loadingMode)
{
return new SegmentChangeStatus(State.PENDING, null, loadingMode);
}

public static SegmentChangeStatus failed(String cause, SegmentLoadingMode loadingMode)
{
return new SegmentChangeStatus(State.FAILED, cause, loadingMode);
}

public static SegmentChangeStatus failed(String cause)
{
return new SegmentChangeStatus(State.FAILED, cause);
return new SegmentChangeStatus(State.FAILED, cause, null);
}

@JsonCreator
private SegmentChangeStatus(
@JsonProperty("state") State state,
@JsonProperty("failureCause") @Nullable String failureCause
@JsonProperty("failureCause") @Nullable String failureCause,
@JsonProperty("loadingMode") @Nullable SegmentLoadingMode loadingMode
)
{
this.state = Preconditions.checkNotNull(state, "state must be non-null");
this.failureCause = failureCause;
this.loadingMode = loadingMode;
}

@JsonProperty
Expand All @@ -71,12 +101,41 @@ public String getFailureCause()
return failureCause;
}

@Nullable
@JsonProperty
public SegmentLoadingMode getLoadingMode()
{
return loadingMode;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentChangeStatus that = (SegmentChangeStatus) o;
return state == that.state
&& Objects.equals(failureCause, that.failureCause)
&& loadingMode == that.loadingMode;
}

@Override
public int hashCode()
{
return Objects.hash(state, failureCause, loadingMode);
}

@Override
public String toString()
{
return "SegmentChangeStatus{" +
"state=" + state +
", failureCause='" + failureCause + '\'' +
", loadingMode=" + loadingMode +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.server.coordination;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -57,7 +58,7 @@
* Responsible for loading and dropping of segments by a process that can serve segments.
*/
@ManageLifecycle
public class SegmentLoadDropHandler implements DataSegmentChangeHandler
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason to remove the interface?
I think this class is one of the main implementations of it, so if required, isn't it possible to modify the interface itself instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't make sense to modify the interface since a DataSegmentChangeRequest itself is not aware of the loading mode.

Having SegmentLoadDropHandler implement the handler interface doesn't seem to serve any real purpose, since we are already creating a separate handler inside processRequest method. That handler then calls the outside add or remove methods. So I decided to just get rid of that interface.

For now, this seemed the cleanest approach. We can add it back in the future, if needed.

Please let me know what you think.

public class SegmentLoadDropHandler
{
private static final EmittingLogger log = new EmittingLogger(SegmentLoadDropHandler.class);

Expand Down Expand Up @@ -125,6 +126,7 @@ public SegmentLoadDropHandler(
this.normalLoadExec = normalLoadExec;
this.turboLoadExec = turboLoadExec;

// Allow core threads to time out to save resources when not in turbo mode
this.turboLoadExec.allowCoreThreadTimeOut(true);

this.segmentsToDelete = new ConcurrentSkipListSet<>();
Expand All @@ -141,12 +143,15 @@ public Map<String, SegmentRowCountDistribution> getRowCountDistributionPerDataso
return segmentManager.getRowCountDistribution();
}

@Override
public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
public void addSegment(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have an internal overloaded method instead if we wanted to keep the interface

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied to the other comment.
No point adding two methods since only one of them is ever meant to be used in practice.

The contract of SegmentLoadDropHandler can probably be cleaned up further by making addSegment and removeSegment private. But that didn't seem necessary now.

DataSegment segment,
@Nullable DataSegmentChangeCallback callback,
SegmentLoadingMode loadingMode
)
{
SegmentChangeStatus result = null;
try {
log.info("Loading segment[%s]", segment.getId());
log.info("Loading segment[%s] in mode[%s]", segment.getId(), loadingMode);
/*
The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts,
and if (segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment
Expand Down Expand Up @@ -179,13 +184,14 @@ each time when addSegment() is called, it has to wait for the lock in order to m
throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getId());
}

result = SegmentChangeStatus.SUCCESS;
result = SegmentChangeStatus.success(loadingMode);
}
catch (Throwable e) {
log.makeAlert(e, "Failed to load segment")
.addData("segment", segment)
.emit();
result = SegmentChangeStatus.failed(e.toString());
Throwable rootCause = Throwables.getRootCause(e);
result = SegmentChangeStatus.failed(rootCause.toString(), loadingMode);
}
finally {
updateRequestStatus(new SegmentChangeRequestLoad(segment), result);
Expand All @@ -195,7 +201,6 @@ each time when addSegment() is called, it has to wait for the lock in order to m
}
}

@Override
public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
{
removeSegment(segment, callback, true);
Expand Down Expand Up @@ -242,7 +247,7 @@ void removeSegment(
runnable.run();
}

result = SegmentChangeStatus.SUCCESS;
result = SegmentChangeStatus.success();
}
catch (Exception e) {
log.makeAlert(e, "Failed to remove segment")
Expand Down Expand Up @@ -322,11 +327,13 @@ public void addSegment(
@Nullable DataSegmentChangeCallback callback
)
{
requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING));
getExecutorService(segmentLoadingMode).submit(
final SegmentChangeStatus pendingStatus = SegmentChangeStatus.pending(segmentLoadingMode);
requestStatuses.put(changeRequest, new AtomicReference<>(pendingStatus));
getLoadingExecutor(segmentLoadingMode).submit(
() -> SegmentLoadDropHandler.this.addSegment(
((SegmentChangeRequestLoad) changeRequest).getSegment(),
() -> resolveWaitingFutures()
() -> resolveWaitingFutures(),
segmentLoadingMode
)
);
}
Expand All @@ -337,7 +344,7 @@ public void removeSegment(
@Nullable DataSegmentChangeCallback callback
)
{
requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING));
requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.pending()));
SegmentLoadDropHandler.this.removeSegment(
((SegmentChangeRequestDrop) changeRequest).getSegment(),
() -> resolveWaitingFutures(),
Expand Down Expand Up @@ -428,7 +435,7 @@ public boolean cancel(boolean interruptIfRunning)
}
}

private ExecutorService getExecutorService(SegmentLoadingMode loadingMode)
private ExecutorService getLoadingExecutor(SegmentLoadingMode loadingMode)
{
return loadingMode == SegmentLoadingMode.TURBO ? turboLoadExec : normalLoadExec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.error.InvalidInput;
import org.joda.time.Duration;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -50,12 +50,13 @@ public HttpLoadQueuePeonConfig(
{
this.hostTimeout = Configs.valueOrDefault(hostTimeout, Duration.standardMinutes(5));
this.repeatDelay = Configs.valueOrDefault(repeatDelay, Duration.standardMinutes(1));

if (batchSize != null && batchSize < 1) {
throw new RE("Batch size must be greater than 0.");
}

this.batchSize = batchSize;

InvalidInput.conditionalException(
batchSize == null || batchSize >= 1,
"'druid.coordinator.loadqueuepeon.http.batchSize'[%s] must be greater than 0",
batchSize
);
}

@Nullable
Expand Down
Loading
Loading