Materialized view implementation#5556
Conversation
|
@zhangxinyu1 thanks for raising this PR! Would you add a link to the proposal here? |
|
Oh, never mind. It's already here. |
|
I restarted Travis. @zhangxinyu1 would you check the TeamCity inspection failure? |
8d328ac to
fdf16a9
Compare
|
@zhangxinyu1 thanks for the fix. I'll start my review. BTW, did you have a chance to test this feature in some real clusters? |
|
@jihoonson Thanks! |
jihoonson
left a comment
There was a problem hiding this comment.
Reviewed up to MaterializedViewMetadataCoordinator.
There was a problem hiding this comment.
I believe we will have more types of views in the future. Please use more specific name like derivativeDataSource.
There was a problem hiding this comment.
BTW, this annotation is not needed since you added a NamedType here.
There was a problem hiding this comment.
nit: Can be simplified to this.baseDataSource = Preconditions.checkNotNull(baseDataSource, "baseDataSource cannot be null. This is not a valid DerivativeDataSourceMetadata.");
There was a problem hiding this comment.
Looks like the logic is almost same with equals(). Then it would be better to call equals() here.
There was a problem hiding this comment.
Then, this should throw UnsupportedOperationException. If this causes a problem, you might need to add some methods like isMergeable() and isSubtractable() to the DataSourceMetadata interface.
There was a problem hiding this comment.
Same here. This should throw UnsupportedOperationException.
There was a problem hiding this comment.
This method doesn't throw Exception.
There was a problem hiding this comment.
nit: unnecessary type arguments.
There was a problem hiding this comment.
Probably this method should be merged into IndexerSQLMetadataStorageCoordinator.resetDataSourceMetadata() and that method should check an entry already exists in metastore and insert a new entry if it doesn't. Otherwise, it can update the existing entry.
There was a problem hiding this comment.
Yes, this method can be merged into IndexerSQLMetadataStorageCoordinator.resetDataSourceMetadata(). However, maybe we can do it in another pr, because we should consider the logic of code where used this method.
There was a problem hiding this comment.
This method should return only used segments. Please add a method like getUsedSegmentsForInterval() which returns List<Pair<DataSegment, String>> to IndexerMetadataStorageCoordinator.
There was a problem hiding this comment.
maxCreatedDate is a less-intuitive name.
@zhangxinyu1 that is great! I think it would be enough. I'll test this PR in our cluster as well. |
6627334 to
ee6fec7
Compare
|
@jihoonson I have modified code according to your comments. Could you please go on to review it? |
|
@zhangxinyu1 sure. I'll review tomorrow. |
jihoonson
left a comment
There was a problem hiding this comment.
@zhangxinyu1 still reviewing. Reviewed up to DataSourceOptimizer.
There was a problem hiding this comment.
Would you elaborate more on why this feature is split into two extensions? If we need to always load both extensions to use this feature, it would be better to make a single extension.
There was a problem hiding this comment.
I can't agree with you more. However, DataSourceOptimizer need BrokerServerView to get the timeline of different dataSources to do optimizing, and only broker has this information. Then, materialized-view-selection module has to be only loaded in broker, so I have to split it into two extensions. I thought about this for a long time, but cannot figure out how to solve this problem. Do you have any suggestions?
There was a problem hiding this comment.
Do you mean that materialized-view-maintenance should be loaded only in overlords while materialized-view-selection should be loaded only in brokers?
There was a problem hiding this comment.
materialized-view-selection should be loaded only in brokers, but materialized-view-maintenance can be loaded anywhere.
There was a problem hiding this comment.
Ah, ok. We don't have a nice way to do this currently.. I think it's fine with going as it is. Would you please add some comments about this, especially materialized-view-selection should be loaded only in brokers?
There was a problem hiding this comment.
Sure, I'm working on your comments these days. Thanks very much!
There was a problem hiding this comment.
Please use Intervals.ETERNITY instead.
There was a problem hiding this comment.
Intervals.ETERNITY doesn't work well when comparing to a varchar in metastore.
There was a problem hiding this comment.
Would let me know which error you saw?
There was a problem hiding this comment.
Intervals.ETERNITY="-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z".
When we use it to compare the start and end of segments to get all segments from metastore, such as :
select * from druid_segments where start > '-146136543-09-08T08:23:32.096Z' and end < '146140482-04-24T15:36:27.903Z';,
An empty set will be returned, that is because no end is less than '146140482-04-24T15:36:27.903Z'.
There was a problem hiding this comment.
Looks like DEFAULT_MAX_TASK_COUNT.
There was a problem hiding this comment.
The line indentation is not correct. Please adjust it.
There was a problem hiding this comment.
Please break the line like
Preconditions.checkNotNull(
baseDataSource,
"baseDataSource cannot be null. Please provide a baseDataSource."
);
Same for the following 3 lines.
There was a problem hiding this comment.
Please add some javadoc.
There was a problem hiding this comment.
This should be a final non-static variable.
There was a problem hiding this comment.
serverView is used in optimize method, and this method is static.
There was a problem hiding this comment.
I mean, this is should be a final non-static variable because it's quite dangerous. As you said, serverView is used in a static method (optimize()), but is initialized in the constructor. As you know, static methods can be used without creating an instance which means serverView might not be initialized when optimize() is called. This currently works because Guice initializes DataSourceOptimizer when DataSourceOptimizerMonitor is initialized and this happens to be before optimize() is called. However, it might be broken in the future if somethings change like someone decides to make DataSourceOptimizerMonitor configurable and disables it.
There was a problem hiding this comment.
Thanks, you'r right. I'll modify it
There was a problem hiding this comment.
Please rename to DataSourceOptimizer.
There was a problem hiding this comment.
These variables represent the metrics of dataSourceOptimizer, which means dataSourceOptimizer needs to keep some states. Why don't we simply making a singleton instance of this?
There was a problem hiding this comment.
DataSourceOptimizer is a singleton instance, and I use static because optimize method is a static method.
Why don't we simply making a singleton instance of this?
Do you mean I should write another class (e.g. DataSourceOptimizerMetrics) to do record these states.
There was a problem hiding this comment.
Oh, you're right. It's singleton. Then, I wonder why you made the optimize() method static. Usually static methods are useful when a class doesn't have to keep any states (like util classes). But, DataSourceOptimizer does keep states (that is, metrics).
There was a problem hiding this comment.
@zhangxinyu1 left more comments. It looks a nice start for supporting this kind of cool feature!
Also please add some documentation. I would love to test this in my cluster!
There was a problem hiding this comment.
This should be outside of the try clause (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Lock.html).
There was a problem hiding this comment.
Also, probably this should be writeLock().
There was a problem hiding this comment.
It looks that the these maps are synchronized with lock. If so, they don't have to be the concurrentHashMap.
There was a problem hiding this comment.
Also please leave some comments about what these maps mean.
There was a problem hiding this comment.
lock is mainly used to synchronized all stats in getAndResetStats() method. In getAndResetStats() , we get snapshots of stats one by one, and then clear all stats. I use lock to ensure there is no changing of stats between these steps.
I use concurrentHashMap beacause, in optimize(), each stat increases concurrently.
There was a problem hiding this comment.
I'm not sure I understood correctly, but if my new comments are correct, readLock() and writeLock() should be used in getAndResetStats() and optimize(), respectively. If so, concurrentMap is not needed because only one thread can write at a time in optimize(), and all threads can read without contention in getAndResetStats().
There was a problem hiding this comment.
In my design, many threads are allowed to call optimize() simultaneously, because, MaterializedViewQuery need to be optimized concurrently, so I use readLock in optimize(). It means that these stats can be get and changed respectively by these threads.
However, when a thread call getAndResetStats() to get the whole snapshot of stats, these stats are not allowed to change respectively. Therefore, I use the writeLock() to limit the call to optimize().
There was a problem hiding this comment.
Ok. Please add some comments about this.
There was a problem hiding this comment.
nit: can be Collections.singletonList(query).
There was a problem hiding this comment.
Please rename to more intuitive name. Looks like DerivativeDataSource?
There was a problem hiding this comment.
Should be Objects.hash(VIEW, query).
There was a problem hiding this comment.
Is this class needed in the current implementation?
There was a problem hiding this comment.
No, it's useless. Should I remove it?
There was a problem hiding this comment.
Please throw an exception if the query type is unknown.
There was a problem hiding this comment.
I think it's better to add a method to DimFilter which returns all required column names.
There was a problem hiding this comment.
Yes, it's better if we add this method. Because it will miss the case when any new implementation of DimFilter . But, do you think I should add this method in this pr?
There was a problem hiding this comment.
I think it's up to you. If you don't want to make this PR bigger, please raise an issue for this.
There was a problem hiding this comment.
Probably this should be readLock().
There was a problem hiding this comment.
Also, probably this should be writeLock().
There was a problem hiding this comment.
I'm not sure I understood correctly, but if my new comments are correct, readLock() and writeLock() should be used in getAndResetStats() and optimize(), respectively. If so, concurrentMap is not needed because only one thread can write at a time in optimize(), and all threads can read without contention in getAndResetStats().
ee6fec7 to
6887cb0
Compare
The rough documentation about how to use this feature is at the front of this pr. Should I add some documentation to |
|
@zhangxinyu1 yes, you can add docs to the directory under $DRUID/docs/content/development/extensions-contrib like other extensions. |
6887cb0 to
4a6a372
Compare
|
@zhangxinyu1 thanks for the update! I didn't realize that. I'll take another look and do some tests in our cluster. BTW, a recent change (#5583) merged into master includes a change of the signature of HadoopTuningConfig which makes merging this PR failed. Would you update this PR? |
4a6a372 to
0ab2bcd
Compare
|
@jihoonson Thanks for reminding. I have updated it. |
b879328 to
1b452d6
Compare
jihoonson
left a comment
There was a problem hiding this comment.
@zhangxinyu1 thanks for the update. I left my last comments. I also tested this PR in my local machine. It works nicely!
There was a problem hiding this comment.
Please add that this feature currently requires a hadoop cluster.
There was a problem hiding this comment.
Would you check this comment?
There was a problem hiding this comment.
Same here. Null check is unnecessary.
There was a problem hiding this comment.
I suggest to modify List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval); to return List<Pair<DataSegment, String>> rather than adding a new method.
There was a problem hiding this comment.
I don't know. I just think when someone calls method getUsedSegmentsForInterval , maybe he doesn't want to get the information about created date.
There was a problem hiding this comment.
Maybe created_date should be a part of DataSegment. In this way, we only need the method List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval);. What do you think?
There was a problem hiding this comment.
The only usage of getUsedSegmentsForInterval() is SegmentAllocateAction. It checks any segments are already allocated for the given interval to allocate a new segment id. I think it can just ignore the createdDate part.
There was a problem hiding this comment.
Maybe created_date should be a part of DataSegment. In this way, we only need the method List getUsedSegmentsForInterval(String dataSource, Interval interval);. What do you think?
Hmm, that's a good point. It sounds good, but I'm not sure about why created_date is not a part of DataSchema itself. @gianm any idea?
There was a problem hiding this comment.
Alright, let me raise an issue for this and merge these two methods in another pr, because it affect about 16 Classes.
There was a problem hiding this comment.
Sounds good. Please go for it.
1b452d6 to
1938ce5
Compare
There was a problem hiding this comment.
Could just return the boolean expression
There was a problem hiding this comment.
Can we do tuningConfigForTask.withVersion instead?
There was a problem hiding this comment.
I'm afraid not, because though withVersion function can set new version, it cannot set useExplicitVersion = true.
There was a problem hiding this comment.
It'd be kinda nice to make the UnionDataSource support QueryDataSources and reuse it to run a list of queries.
There was a problem hiding this comment.
I don't understand. Could you please describe it more detail? Thanks!
There was a problem hiding this comment.
Sure - not an important suggestion so please ignore if it seems irrelevant or too much work :)
In order to execute a materialised view query we have to issue multiple queries on different intervals and merge their results. That might be a more generally useful component where user's can union multiple queries rather than just multiple datasources.
There was a problem hiding this comment.
If it's easy to do i think it'd be worth supporting UnionDatasources as well. Would it just be a matter of iterating over a list of datasource names and running the rest of this method and flattening the resulting list of queries?
There was a problem hiding this comment.
Thanks for your suggestion. The current implementation support UnionDataSource in this way: In UnionQueryRunner, UnionDataSource are transformed to some TableDataSources, and then, these TableDataSources are optimized in DataSourceOptimizer.java. Is is ok?
There was a problem hiding this comment.
Ah got you, thanks for the explanation!
1938ce5 to
3623f9b
Compare
|
@Dylan1312 Could you please trigger the travis CI building? |
|
Afraid I don't have the appropriate permission, a committer should be able to help you out |
|
you can always close and reopen the PR to restart the build ... |
|
@Dylan1312 Thanks! |
|
@b-slim It works, thanks! |
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
| import java.util.function.Consumer; | ||
|
|
||
| public class DatasourceOptimizer |
There was a problem hiding this comment.
Please forgive me for posting this here - I'm not a committer/reviewer, so my feedback does not count, but there is one thing that looks incorrect to me:
Class DatasourceOptimizer states that
"Derived dataSource with smallest average size of segments have highest priority to replace the datasource in user query"
and accordingly the following lines produce this prioritized collection of derivatives:
// get all derivatives for datasource in query. The derivatives set is sorted by average size of per segment granularity.
ImmutableSortedSet<Derivative> derivatives = DerivativesManager.getDerivatives(datasourceName);
However, a few lines below items from the above collection named "derivatives" which is sorted by priority get selected and put into the following collection, which is simply a hashset, which is not sorted and which according to javadoc does also not guarantee that the items are in insertion order:
Set<Derivative> derivativesWithRequiredFields = Sets.newHashSet();
To my understanding, the "derivativesWithRequiredFields" should be a list or a LinkedHashSet such that it is guaranteed that later on the best derivative gets consulted first.
thanks
There was a problem hiding this comment.
@sascha-coenen thanks for your attention and suggestion.
Please see the latest version of DataSourceOptimizer here: https://github.com/druid-io/druid/pull/5556/files#diff-250d80eb8afc10c49ee91e41d8f9d91c .
The derivativesWithRequiredFields will be sorted when it is used as follows :
for (DerivativeDataSource derivativeDataSource : ImmutableSortedSet.copyOf(derivativesWithRequiredFields))
|
I'm going to remove
|
|
All right. I'm going to merge this PR shortly. |
|
Merged. @zhangxinyu1 thank you for the contribution! |
|
@jihoonson Thanks! I will work on the related issue #5710 and #5775 these days. |
| List<Pair<DataSegment, String>> snapshot | ||
| ) | ||
| { | ||
| Interval maxAllowedToBuildInterval = snapshot.parallelStream() |
| .list() | ||
| ); | ||
|
|
||
| List<DerivativeDataSource> derivativeDataSources = derivativesInDatabase.parallelStream() |
Target
To optimize query.
Implementation
There are two extensions namely materialized-view-maintenance and materialized-view-selection.
In materialized-view-maintenance,
MaterializedViewSupervisoris used to generate or drop derived datasource segments and keep the timeline's consistency of base datasource and derived datasource.In materialized-view-selection,
MaterializedViewQueryis implemented to do materialized-view-selection for topn/groupby/timeseries query.The detailed design and discussion is in issue #5304
Usage