Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
43098ba
Changes to publish realtime segment schema changes in segment announc…
Nov 28, 2023
095c7ea
Merge remote-tracking branch 'upstream/master' into schema_annoucement
Nov 28, 2023
828d150
Fix checkstyle issues
Nov 28, 2023
10856d6
Revert changes in CombiningSequenceTest
Nov 28, 2023
91582fc
Minor changes
Nov 28, 2023
c0de275
Fix build
Nov 29, 2023
abeb6a0
Add javadocs
Nov 29, 2023
fc6249b
minor change
Nov 29, 2023
1c6af13
Minor change
Nov 29, 2023
e9b83d4
Update SegmentSchemas pojo, dev testing
Nov 30, 2023
fac850e
Add tests
Dec 3, 2023
050c708
Remove forbidden apis
Dec 3, 2023
a8a90a8
Remove forbidden api usage
Dec 3, 2023
219e270
Fix tests
Dec 3, 2023
6394616
Update logic to build sink schema
Dec 5, 2023
1315922
Explicitly add __time column in Sink#getSignature
Dec 5, 2023
f9edb58
Remove forbidden api usage
Dec 5, 2023
3a00d9e
Fix spotbug
Dec 5, 2023
03a384d
Merge remote-tracking branch 'upstream/master' into schema_annoucement
Dec 6, 2023
f32d390
Rename config for CentralizedDatasourceSchema feature
Dec 6, 2023
e6cbc92
Minor changes
Dec 6, 2023
6aac669
Fix checkstyle
Dec 8, 2023
bae54c4
Merge remote-tracking branch 'upstream/master' into schema_annoucement
Dec 8, 2023
b5a656e
Merge remote-tracking branch 'upstream/master' into schema_annoucement
Dec 11, 2023
2518495
Add guardrail to prevent enabling the feature with zk based segment a…
Dec 11, 2023
ec56554
checkstyle
Dec 11, 2023
c110e17
Enable segment schema announcement for realtime segment in IT
Dec 12, 2023
b0f8637
minor change
Dec 13, 2023
141158c
null executor service in StreamAppenderator#SinkSchemaAnnouncer if fe…
Dec 13, 2023
92adfa3
Merge remote-tracking branch 'upstream/master' into schema_annoucement
Dec 13, 2023
63d43f5
Merge remote-tracking branch 'upstream/master' into schema_annoucement
Jan 3, 2024
1ffdf80
Address feedback
Jan 3, 2024
74150f8
Minor change
Jan 3, 2024
2986317
Throw exception in Peons if feature is enabled alongwith zk based seg…
Jan 3, 2024
dc7b0ba
Merge remote-tracking branch 'upstream/master' into schema_annoucement
Jan 4, 2024
9316cb0
Minor changes
Jan 4, 2024
0513e80
Rename method in DataSegmentAnnouncer
Jan 4, 2024
2704d1e
Merge remote-tracking branch 'upstream/master' into schema_annoucement
Jan 8, 2024
9fcfb62
Address feedback
Jan 8, 2024
4923414
Add test to achieve coverage
Jan 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ZkPathsConfig;
Expand Down Expand Up @@ -291,6 +292,12 @@ public CallbackAction segmentViewInitialized()
{
return callback.segmentViewInitialized();
}

@Override
public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
{
return CallbackAction.CONTINUE;
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class TaskToolbox

private final TaskLogPusher taskLogPusher;
private final String attemptId;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

public TaskToolbox(
SegmentLoaderConfig segmentLoaderConfig,
Expand Down Expand Up @@ -171,7 +173,8 @@ public TaskToolbox(
ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider,
ShuffleClient shuffleClient,
TaskLogPusher taskLogPusher,
String attemptId
String attemptId,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
this.segmentLoaderConfig = segmentLoaderConfig;
Expand Down Expand Up @@ -215,6 +218,7 @@ public TaskToolbox(
this.shuffleClient = shuffleClient;
this.taskLogPusher = taskLogPusher;
this.attemptId = attemptId;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}

public SegmentLoaderConfig getSegmentLoaderConfig()
Expand Down Expand Up @@ -487,6 +491,11 @@ public RuntimeInfo getAdjustedRuntimeInfo()
return createAdjustedRuntimeInfo(JvmUtils.getRuntimeInfo(), appenderatorsManager);
}

public CentralizedDatasourceSchemaConfig getCentralizedTableSchemaConfig()
{
return centralizedDatasourceSchemaConfig;
}

/**
* Create {@link AdjustedRuntimeInfo} based on the given {@link RuntimeInfo} and {@link AppenderatorsManager}. This
* is a way to allow code to properly apportion the amount of processors and heap available to the entire JVM.
Expand Down Expand Up @@ -553,6 +562,7 @@ public static class Builder
private ShuffleClient shuffleClient;
private TaskLogPusher taskLogPusher;
private String attemptId;
private CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

public Builder()
{
Expand Down Expand Up @@ -598,6 +608,7 @@ public Builder(TaskToolbox other)
this.intermediaryDataManager = other.intermediaryDataManager;
this.supervisorTaskClientProvider = other.supervisorTaskClientProvider;
this.shuffleClient = other.shuffleClient;
this.centralizedDatasourceSchemaConfig = other.centralizedDatasourceSchemaConfig;
}

public Builder config(final SegmentLoaderConfig segmentLoaderConfig)
Expand Down Expand Up @@ -840,6 +851,12 @@ public Builder attemptId(final String attemptId)
return this;
}

public Builder centralizedTableSchemaConfig(final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig)
{
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
return this;
}

public TaskToolbox build()
{
return new TaskToolbox(
Expand Down Expand Up @@ -882,7 +899,8 @@ public TaskToolbox build()
supervisorTaskClientProvider,
shuffleClient,
taskLogPusher,
attemptId
attemptId,
centralizedDatasourceSchemaConfig
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
Expand Down Expand Up @@ -114,6 +115,7 @@ public class TaskToolboxFactory
private final ShuffleClient shuffleClient;
private final TaskLogPusher taskLogPusher;
private final String attemptId;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

@Inject
public TaskToolboxFactory(
Expand Down Expand Up @@ -155,7 +157,8 @@ public TaskToolboxFactory(
ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider,
ShuffleClient shuffleClient,
TaskLogPusher taskLogPusher,
@AttemptId String attemptId
@AttemptId String attemptId,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
this.segmentLoaderConfig = segmentLoadConfig;
Expand Down Expand Up @@ -197,6 +200,7 @@ public TaskToolboxFactory(
this.shuffleClient = shuffleClient;
this.taskLogPusher = taskLogPusher;
this.attemptId = attemptId;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}

public TaskToolbox build(Task task)
Expand Down Expand Up @@ -260,6 +264,7 @@ public TaskToolbox build(TaskConfig config, Task task)
.shuffleClient(shuffleClient)
.taskLogPusher(taskLogPusher)
.attemptId(attemptId)
.centralizedTableSchemaConfig(centralizedDatasourceSchemaConfig)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,8 @@ private Appenderator newAppenderator(
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler,
isUseMaxMemoryEstimates()
isUseMaxMemoryEstimates(),
toolbox.getCentralizedTableSchemaConfig()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.SegmentPublisher;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
Expand Down Expand Up @@ -310,6 +311,16 @@ public void unannounceSegments(Iterable<DataSegment> segments) throws IOExceptio
}
}
}

@Override
public void announceSegmentSchemas(String taskId, SegmentSchemas sinksSchema, SegmentSchemas sinksSchemaChange)
{
}

@Override
public void removeSegmentSchemasForTask(String taskId)
{
}
};

// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public Appenderator newAppenderator(
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler,
isUseMaxMemoryEstimates()
isUseMaxMemoryEstimates(),
toolbox.getCentralizedTableSchemaConfig()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
Expand Down Expand Up @@ -155,7 +156,8 @@ public void setUp() throws IOException
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
Expand Down Expand Up @@ -1644,7 +1645,8 @@ public void close()
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
Expand Down Expand Up @@ -1017,7 +1018,8 @@ public void close()
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);

return toolboxFactory.build(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
Expand Down Expand Up @@ -70,7 +71,8 @@ public Appenderator createRealtimeAppenderatorForTask(
CachePopulatorStats cachePopulatorStats,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
boolean useMaxMemoryEstimates
boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
realtimeAppenderator = Appenderators.createRealtime(
Expand All @@ -93,7 +95,8 @@ public Appenderator createRealtimeAppenderatorForTask(
cachePopulatorStats,
rowIngestionMeters,
parseExceptionHandler,
useMaxMemoryEstimates
useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
return realtimeAppenderator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.NoopDataSegmentMover;
import org.apache.druid.segment.loading.NoopDataSegmentPusher;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
Expand Down Expand Up @@ -135,7 +136,8 @@ public void setup() throws IOException
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
runner = new SingleTaskBackgroundRunner(
toolboxFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentArchiver;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentTest;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
Expand Down Expand Up @@ -665,7 +666,8 @@ public void announceSegment(DataSegment segment)
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
Expand Down Expand Up @@ -115,7 +116,8 @@ public TestTaskToolboxFactory(
bob.supervisorTaskClientProvider,
bob.shuffleClient,
bob.taskLogPusher,
bob.attemptId
bob.attemptId,
bob.centralizedDatasourceSchemaConfig
);
}

Expand Down Expand Up @@ -159,6 +161,7 @@ public static class Builder
private ShuffleClient shuffleClient;
private TaskLogPusher taskLogPusher;
private String attemptId;
private CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

public Builder setConfig(TaskConfig config)
{
Expand Down Expand Up @@ -387,5 +390,10 @@ public Builder setAttemptId(String attemptId)
this.attemptId = attemptId;
return this;
}

public void setCentralizedTableSchemaConfig(CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig)
{
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
Expand Down Expand Up @@ -700,7 +701,8 @@ public void close()
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.test;

import com.google.common.collect.Sets;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;

Expand Down Expand Up @@ -57,4 +58,13 @@ public void unannounceSegments(Iterable<DataSegment> segments)
}
}

@Override
public void announceSegmentSchemas(String taskId, SegmentSchemas segmentSchemas, SegmentSchemas segmentSchemasChange)
{
}

@Override
public void removeSegmentSchemasForTask(String taskId)
{
}
}
Loading