Allow use of non-threadsafe ObjectCachingColumnSelectorFactory#4397
Conversation
…d not be threadsafe.
68413b2 to
1085000
Compare
| @@ -366,14 +390,24 @@ public void close() | |||
| // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. | |||
| static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory | |||
| { | |||
There was a problem hiding this comment.
Could you please also change code of makeXxxColumnSelector methods to use computeIfAbsent() instead of putIfAbsent()?
| @@ -91,6 +92,26 @@ public OffheapIncrementalIndex( | |||
| aggBuffers.add(bb); | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
There are no compatibility concerns, so suggested to inline all usages of this constructor and remove it.
There was a problem hiding this comment.
I have a similar constructor for OnheapIncrementalIndex, should I inline it as well?
There was a problem hiding this comment.
Yes. Also having 4 boolean parameters in a row on the call sites is dangerous (to accidentally switch them) and not readable, consider replacing one or two of them with enums of two options.
There was a problem hiding this comment.
Yeah, I thought so too. Actually, I was considering creating Builders for On/OffheapIncrementalIndex, would that be better?
There was a problem hiding this comment.
Okay, I will go with that.
|
|
||
| if (concurrentEventAdd) { | ||
| longColumnSelectorMap = Maps.newConcurrentMap(); | ||
| floatColumnSelectorMap = Maps.newConcurrentMap(); |
There was a problem hiding this comment.
Please use ConcurrentHashMap, the implementation behind Maps.newConcurrentMap() has higher overhead and is less scalable.
|
It would be nice if you add docs to somewhere when IncrementalIndex must be thread-safe or not. |
|
Hi @jihoonson, it's already stated together with But I will add in additional remarks/docs together with the declaration of the parameter. |
- Replace Maps.newXXXMap() with normal instantiation - Documentations on when is thread-safe required. - Use Builders for On/OffheapIncrementalIndex
| .withRollup(IncrementalIndexSchema.DEFAULT_ROLLUP) | ||
| .build() | ||
| ) | ||
| .setMaxRowCount(1000) |
There was a problem hiding this comment.
This test is about groupBy, shouldn't it have .setConcurrentEventAdd(true)?
| boolean sortFacts, | ||
| int maxRowCount | ||
| ) | ||
| public static class Builder |
There was a problem hiding this comment.
This builder and OffHeapIncrementalIndex.Builder could be the same class with two building methods: buildOffHeap() and buildOnHeap().
There was a problem hiding this comment.
Sure, that can be done.
Actually, I have also considered having OffheapIncrementalIndex.Builder to extend the OnheapIncrementalIndex.Builder. Would this be better?
There was a problem hiding this comment.
I think overriding doesn't fit here, just IncrementalIndexBuilder with two different building methods is better IMO.
| ); | ||
| return prev != null ? prev : newSelector; | ||
| } | ||
| return floatColumnSelectorMap.computeIfAbsent(columnName, delegate::makeFloatColumnSelector); |
There was a problem hiding this comment.
computeIfAbsent() when majority of calls if expected to find the entry is present already could be slower. So what I meant is code like this:
FloatColumnSelector existing = floatColumnSelectorMap.get(columnName);
if (existing != null) {
return existing;
} else {
floatColumnSelectorMap.computeIfAbsent(columnName, delegate::makeFloatColumnSelector);
}There was a problem hiding this comment.
But won't this be reflected in the implementation of computeIfAbsent? The source code of the Map interface as follows:
default V computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction) {
Objects.requireNonNull(mappingFunction);
V v;
if ((v = get(key)) == null) {
V newValue;
if ((newValue = mappingFunction.apply(key)) != null) {
put(key, newValue);
return newValue;
}
}
return v;
}
There was a problem hiding this comment.
Because of how code is inlined it might be not identical. And for ConcurrentHashMap the difference is bigger, because computeIfAbsent() always locks, and get() just makes a volatile read.
There was a problem hiding this comment.
I see, thanks for the explanation.
| ); | ||
| return prev != null ? prev : newSelector; | ||
| } | ||
| return longColumnSelectorMap.computeIfAbsent(columnName, delegate::makeLongColumnSelector); |
| ); | ||
| return prev != null ? prev : newSelector; | ||
| } | ||
| return objectColumnSelectorMap.computeIfAbsent(columnName, delegate::makeObjectColumnSelector); |
| ) | ||
| ).build() | ||
| ) | ||
| .setMaxRowCount(NUM_POINTS) |
There was a problem hiding this comment.
setReportParseExceptions(false) should be here
| .withMinTimestamp(0) | ||
| .withQueryGranularity(Granularities.NONE) | ||
| .withMetrics(aggs) | ||
| .withRollup(IncrementalIndexSchema.DEFAULT_ROLLUP) |
There was a problem hiding this comment.
DEFAULT_ROLLUP (true) is set by default in the builder, don't need to be specified explicitly
| return new OnheapIncrementalIndex.Builder() | ||
| .setIncrementalIndexSchema( | ||
| new IncrementalIndexSchema.Builder() | ||
| .withMinTimestamp(0) |
There was a problem hiding this comment.
This is the default value, don't need to be specified explicitly.
| return new OnheapIncrementalIndex.Builder() | ||
| .setIncrementalIndexSchema( | ||
| new IncrementalIndexSchema.Builder() | ||
| .withQueryGranularity(Granularities.NONE) |
There was a problem hiding this comment.
This is the default value, don't need to be specified explicitly.
| new IncrementalIndexSchema.Builder() | ||
| .withQueryGranularity(Granularities.NONE) | ||
| .withMetrics(schemaInfo.getAggsArray()) | ||
| .withDimensionsSpec(DimensionsSpec.ofEmpty()) |
There was a problem hiding this comment.
This is the default value, don't need to be specified explicitly.
There was a problem hiding this comment.
BTW DimensionsSpec.ofEmpty() is not constant, maybe makes sense to make it so.
- Constant EMPTY DimensionsSpec - Improvement on IncrementalIndexSchema.Builder - Remove setting of default values - Use var args for metrics - Correction on On/OffheapIncrementalIndex Builders - Combine On/OffheapIncrementalIndex Builders
| return new OnheapIncrementalIndex.Builder() | ||
| return new IncrementalIndex.Builder() | ||
| .setIncrementalIndexSchema( | ||
| new IncrementalIndexSchema.Builder() |
There was a problem hiding this comment.
This is a very frequent pattern, suggested to add shortcut like IncrementalIndex.Builder. setSimpleIncrementalIndexSchema(AggregatorFactory...)
There was a problem hiding this comment.
Sure but it seems that such usage (i.e., setting only metrics) only occurs in non-productions use (i.e., tests and benchmarks). Should we add in another API only for these uses?
There was a problem hiding this comment.
I think it's important to remove boilerplate and repetition from tests as well as from production code, because the need to write a lot of boilerplate demotivates people to write tests. Also you can rename method setIncrementalIndexSchema() to just setIndexSchema() because in the context of IncrementalIndex.Builder it's not needed to repeat that the index schema is "incremental".
You could call this method setSimpleTestingIndexSchema and annotate @VisibleForTesting to emphasize that it should be used only for testing.
| return this; | ||
| } | ||
|
|
||
| // A helper method to set a simple index schema with only metrics and default values for the other parameters. Note |
There was a problem hiding this comment.
Opps, really sorry about that.
| // that this method is normally used for testing and benchmarking; it is unlikely that you would use it in | ||
| // production settings. | ||
|
|
||
| /** A helper method to set a simple index schema with only metrics and default values for the other parameters. Note |
There was a problem hiding this comment.
According to Java style text should start on the next line after /**
Added am additional boolean parameter in
IncrementalIndexconstructor (and subsequently the constructors for its subclassesOnheapIncrementalIndexandOffheapIncrementalIndex) and the protected functioninitAggsto indicate ifConcurrentMapshould be used.