Skip to content

Enable querying entirely cold datasources #16676

Merged
cryptoe merged 32 commits intoapache:masterfrom
findingrish:cold_ds_schema
Jul 15, 2024
Merged

Enable querying entirely cold datasources #16676
cryptoe merged 32 commits intoapache:masterfrom
findingrish:cold_ds_schema

Conversation

@findingrish
Copy link
Copy Markdown
Contributor

@findingrish findingrish commented Jul 1, 2024

Issue: #14989

Problem

Currently, datasource schema doesn’t include columns from cold segments. This makes it impossible to query entirely cold datasource.

Approach

  • Mechanism to backfill schema for cold segments in the metadata database. Note, that this is required only for segments created prior to enabling CentralizedDatasourceSchema feature.
  • Update datasource schema building logic on the Coordinator to include schema from cold segments.
  • Make Brokers aware of entirely cold datasource.

Backfill schema for cold segments

Leverage the existing schema backfill flow added as part of CentralizedDatasourceSchema feature. Users are supposed to manually load the cold segments by making their replication factor as 1 and once the schema is backfilled (can be verified from the metadata database) they can unload the segment.

Handling entirely cold datasource

The problem with cold datasource is that Broker just doesn’t know about the datasource if none of the segment are available. So, the datasource wouldn’t even appear on the console for querying.
We need a way for the Brokers to be aware of cold datasource, so that it can fetch its schema from the Coordinator.

Currently, brokers request schema for available datasources from Coordinator in each refresh cycle.
Brokers now poll set of used datasources from the Coordinator first and then request their schema from the Coordinator.

Once Broker has schema for Cold datasources, it will show up in the console and become available for querying.

Key changes

  • CoordinatorSegmentMetadataCache
    • It runs a scheduled thread to fetch used segments and build datasource schema from cold segments. Hot and cold schema is merged when datasource schema is queried.
  • BrokerSegmentMetadataCache
    • The refresh condition is slightly updated, refresh is executed in each cycle if the feature is enabled.
    • The refresh logic is also updated to poll used datasources from the Coordinator. This way Broker can fetch cold datasource schema.

Release Notes

  • CentralizedDatasourceSchema feature needs to be enabled in order to query entirely cold datasources, also it would enable querying on columns only present in cold segments.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR has some refactors which should be tackled separately, in order to facilitate a smoother review of the core changes here.

return datasourceToUnavailableSegments;
}

public Object2IntMap<String> getDatasourceToDeepStorageQueryOnlySegmentCount()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findingrish , this seems like the only new method that has been added here.
Please remove this new class SegmentReplicationStatusManager and move the code back toDruidCoordinator.

If a refactor is required, please do it in a separate PR.
This PR should focus only on the required changes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: It seems that this method had already existed too.
@findingrish , is there any new code in SegmentReplicationStatusManager?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfaraz There is no new code in SegmentReplicationStatusManager. The reason for refactoring was a cyclic dependency between CoordinatorSegmentMetadataCache and DruidCoordinator while trying to use DruidCoordinator#getSegmentReplicationFactor.

I will raise a separate PR for the refactor.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Can you share some more details on how the cyclic dependency is coming into picture?

Copy link
Copy Markdown
Contributor Author

@findingrish findingrish Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently DruidCoordinator has a dependency on CoordinatorSegmentMetadataCache, for this patch I need to use DruidCoordinator#getSegmentReplicationFactor in CoordinatorSegmentMetadataCache which is resulting in cyclic dependency.

As a solution, I have refactored DruidCoordinator to separate out the code which updates segmentReplicationStatus and broadcastSegments.

Let me know if this solution makes sense.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findingrish , you could just expose a method updateSegmentReplicationStatus() on CoordinatorSegmentMetadataCache. Call this method from DruidCoordinator.UpdateReplicationStatus.run() where we update broadcastSegments and segmentReplicationStatus.

Let me know if this works for you.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this approach would work for me.
However, it seems bit odd that DruidCoordinator.UpdateReplicationStatus has to additionally update state in some other class, ideally the consumer CoordinatorSegmentMetadataCache should be pulling this information?

Is there a reason to avoid the refactor work?

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to avoid the refactor work?

Yes, the dependencies are already all over the place which makes the code less readable and also complicates testing. A refactor is needed here but it would have to be thought through a little.

However, it seems bit odd that DruidCoordinator.UpdateReplicationStatus has to additionally update state in some other class,

Not really, you can think of the DruidCoordinator (or rather the UpdateReplicationStatus duty in this case) as sending a notification to the CoordinatorSegmentMetadatCache saying that the segment replication status has been updated. The DruidCoordinator already sends notification to the metadata cache about leadership status, this is another notification in the same vein.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the dependencies are already all over the place which makes the code less readable and also complicates testing. A refactor is needed here but it would have to be thought through a little.

Yes, this makes sense. DruidCoordinator refactoring would need more thought.

Thanks for the suggestion, I will update the patch.

Copy link
Copy Markdown
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.

/**
* Retrieves list of used datasources.
*/
ListenableFuture<Set<String>> fetchUsedDataSources();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the definition of used data sources here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the method name to fetchDatasourcesWithUsedSegments to make it more understandable.
I don't think we need to document about what used segments means, since it is widely referred in the code and docs. For example, here https://druid.apache.org/docs/latest/api-reference/data-management-api/#mark-a-single-segment-as-used.

* It contains schema for datasources with atleast 1 available segment.
*/
protected final ConcurrentMap<String, T> tables = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, T> tables = new ConcurrentHashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Just wondering what specific hashMapMethods are you using which required this change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started using computeIfAbsent method. The explanation is captured here https://github.com/code-review-checklists/java-concurrency/blob/master/README.md#chm-type.

coldScehmaExec = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("DruidColdSchema-ScheduledExecutor-%d")
.setDaemon(true)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a demon thread ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update, we don't need a daemon thread here.

cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
coldSchemaExecFuture = coldScehmaExec.schedule(
this::coldDatasourceSchemaExec,
coldSchemaExecPeriodMillis,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason to undocumented these properties.
Do we have any metrics which tell us the performance of these executor service in terms of number of cold segments back filed ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any metrics which tell us the performance of these executor service in terms of number of cold segments back filed

We are not backfilling segment here. It is just looping over the segments, identifying cold segment and building their schema.
If the datasource schema is updated it is logged.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this exec iterates over all the segments, what things do we have to figure out how much time it took for execution?
Should we publish some summary stats to increase operability.

Copy link
Copy Markdown
Contributor Author

@findingrish findingrish Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I am logging details if the execution duration is greater than 50 seconds.

coldSchemaTable.keySet().retainAll(dataSources);
}

private RowSignature mergeHotAndColdSchema(RowSignature hot, RowSignature cold)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very surprised you need a new method here. There should be existing logic which does this no ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can refactor this a bit to have a single method for merging the RowSignature.

}

// remove any stale datasource from the map
coldSchemaTable.keySet().retainAll(dataSources);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case for this ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in CoordinatorSegmentMetadataCacheTest#testColdDatasourceSchema_verifyStaleDatasourceRemoved.

rishabh singh added 2 commits July 8, 2024 00:10
);

SegmentReplicaCount segmentReplicaCount = new SegmentReplicaCount();
segmentReplicaCount.setRequired(0, 0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the values already be zero in a fresh instance?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I was trying to be explicit about setting it to 0, so that it is clear in the test that the given segment is unavailable.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just add a comment to that effect and/or use test method names that clarify that point.
Invoking this method requires making it public which doesn't really seem necessary since it is only going to be used in this test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Updated.

rishabh singh added 2 commits July 8, 2024 11:27
/**
* Map of datasource and generic object extending DataSourceInformation.
* This structure can be accessed by {@link #cacheExec} and {@link #callbackExec} threads.
* It contains schema for datasources with atleast 1 available segment.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* It contains schema for datasources with atleast 1 available segment.
* It contains schema for datasources with at least 1 available segment.

coldSchemaTable.keySet().retainAll(dataSources);

if (stopwatch.millisElapsed() > COLD_SCHEMA_SLOWNESS_THRESHOLD_MILLIS) {
log.info("Cold schema processing was slow, taking [%d] millis. "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Else for this should be debug.

coldSchemaTable.keySet().retainAll(dataSourceWithColdSegmentSet);

String executionStatsLog = StringUtils.format(
"Cold schema processing was slow, taking [%d] millis. "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Cold schema processing was slow, taking [%d] millis. "
"Cold schema processing took [%d] millis. "


String executionStatsLog = StringUtils.format(
"Cold schema processing was slow, taking [%d] millis. "
+ "Processed [%d] datasources, [%d] segments & [%d] datasourceWithColdSegments.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
+ "Processed [%d] datasources, [%d] segments & [%d] datasourceWithColdSegments.",
+ "Processed total [%d] datasources, [%d] segments. Found [%d] datasources with cold segments.",

@cryptoe cryptoe merged commit 6410453 into apache:master Jul 15, 2024
sreemanamala pushed a commit to sreemanamala/druid that referenced this pull request Aug 6, 2024
Add ability to query entirely cold datasources.
@kfaraz kfaraz added this to the 31.0.0 milestone Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants