Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) {
toolbox.getDataSegmentServerAnnouncer().announce();

appenderator = appenderator0;

final String topic = ioConfig.getStartPartitions().getTopic();
Expand Down Expand Up @@ -567,6 +569,8 @@ public String apply(DataSegment input)
}
}

toolbox.getDataSegmentServerAnnouncer().unannounce();

return success();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import io.druid.segment.loading.StorageLocationConfig;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.DataSegment;
import org.apache.curator.test.TestingCluster;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -1522,6 +1523,7 @@ public void close()
null, // DataSegmentMover
null, // DataSegmentArchiver
new TestDataSegmentAnnouncer(),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
makeTimeseriesOnlyConglomerate(),
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;

Expand All @@ -70,6 +71,7 @@ public class TaskToolbox
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentMover dataSegmentMover;
private final DataSegmentAnnouncer segmentAnnouncer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final MonitorScheduler monitorScheduler;
Expand All @@ -93,6 +95,7 @@ public TaskToolbox(
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
ExecutorService queryExecutorService,
Expand All @@ -116,6 +119,7 @@ public TaskToolbox(
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
Expand Down Expand Up @@ -170,6 +174,11 @@ public DataSegmentAnnouncer getSegmentAnnouncer()
return segmentAnnouncer;
}

public DataSegmentServerAnnouncer getDataSegmentServerAnnouncer()
{
return serverAnnouncer;
}

public SegmentHandoffNotifierFactory getSegmentHandoffNotifierFactory()
{
return handoffNotifierFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;

import java.io.File;
import java.util.concurrent.ExecutorService;
Expand All @@ -57,6 +58,7 @@ public class TaskToolboxFactory
private final DataSegmentMover dataSegmentMover;
private final DataSegmentArchiver dataSegmentArchiver;
private final DataSegmentAnnouncer segmentAnnouncer;
private final DataSegmentServerAnnouncer serverAnnouncer;
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
private final ExecutorService queryExecutorService;
Expand All @@ -79,6 +81,7 @@ public TaskToolboxFactory(
DataSegmentMover dataSegmentMover,
DataSegmentArchiver dataSegmentArchiver,
DataSegmentAnnouncer segmentAnnouncer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentHandoffNotifierFactory handoffNotifierFactory,
QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate,
@Processing ExecutorService queryExecutorService,
Expand All @@ -100,6 +103,7 @@ public TaskToolboxFactory(
this.dataSegmentMover = dataSegmentMover;
this.dataSegmentArchiver = dataSegmentArchiver;
this.segmentAnnouncer = segmentAnnouncer;
this.serverAnnouncer = serverAnnouncer;
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate;
this.queryExecutorService = queryExecutorService;
Expand All @@ -126,6 +130,7 @@ public TaskToolbox build(Task task)
dataSegmentMover,
dataSegmentArchiver,
segmentAnnouncer,
serverAnnouncer,
handoffNotifierFactory,
queryRunnerFactoryConglomerate,
queryExecutorService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,6 @@ public void unannounceSegments(Iterable<DataSegment> segments) throws IOExceptio
}
}
}

@Override
public boolean isAnnounced(DataSegment segment)
{
return toolbox.getSegmentAnnouncer().isAnnounced(segment);
}
};

// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink
Expand Down Expand Up @@ -326,6 +320,8 @@ public String getVersion(final Interval interval)
Supplier<Committer> committerSupplier = null;

try {
toolbox.getDataSegmentServerAnnouncer().announce();

plumber.startJob();

// Set up metrics emission
Expand Down Expand Up @@ -425,6 +421,8 @@ public void run()
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
}
}

toolbox.getDataSegmentServerAnnouncer().unannounce();
}

log.info("Job done!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Interval;
Expand Down Expand Up @@ -102,6 +103,7 @@ public void setUp() throws IOException
mockDataSegmentMover,
mockDataSegmentArchiver,
mockSegmentAnnouncer,
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
mockHandoffNotifierFactory,
mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException
segments.add(segment);
return segment;
}
}, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(),
}, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(),
indexMerger, indexIO, null, null, indexMergerV9
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
Expand Down Expand Up @@ -1042,6 +1043,7 @@ Map<SegmentDescriptor, Pair<Executor, Runnable>> getHandOffCallbacks()
null, // DataSegmentMover
null, // DataSegmentArchiver
new TestDataSegmentAnnouncer(),
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
conglomerate,
MoreExecutors.sameThreadExecutor(), // queryExecutorService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException
segments.add(segment);
return segment;
}
}, null, null, null, null, null, null, null, null, new SegmentLoader()
}, null, null, null, null, null, null, null, null, null, new SegmentLoader()
{
@Override
public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException
}
},
null, // segment announcer
null,
notifierFactory,
null, // query runner factory conglomerate corporation unionized collective
null, // query executor service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ public TaskActionClient create(Task task)
null, // segment mover
null, // segment archiver
null, // segment announcer,
null,
notifierFactory,
null, // query runner factory conglomerate corporation unionized collective
null, // query executor service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
Expand Down Expand Up @@ -566,13 +567,8 @@ public void unannounceSegments(Iterable<DataSegment> segments) throws IOExceptio
{

}

@Override
public boolean isAnnounced(DataSegment segment)
{
return false;
}
}, // segment announcer
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
handoffNotifierFactory,
queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
MoreExecutors.sameThreadExecutor(), // query executor service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ public void unannounceSegments(Iterable<DataSegment> segments) throws IOExceptio
}
}

@Override
public boolean isAnnounced(DataSegment segment)
{
return announcedSegments.contains(segment);
}

public Set<DataSegment> getAnnouncedSegments()
{
return ImmutableSet.copyOf(announcedSegments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,16 @@ public Iterable<DruidServer> getInventory()
{
return null;
}

@Override
public boolean isStarted()
{
return true;
}

@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private WorkerTaskMonitor createTaskMonitor()
new TaskToolboxFactory(
taskConfig,
taskActionClientFactory,
null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory(
null, null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(
null,
new SegmentLoaderConfig()
Expand Down
Loading