Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,14 @@ public void emit(Event event)
{
}
},
baseClient, warehouse, retryConfig, jsonMapper, serverConfig, null, new CacheConfig()
baseClient,
null /* local client; unused in this test, so pass in null */,
warehouse,
retryConfig,
jsonMapper,
serverConfig,
null,
new CacheConfig()
);

defineMocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1699,5 +1699,85 @@
}
}
]
},
{
"description": "groupBy on lookup",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "lookup",
"lookup": "wiki-simple"
},
"intervals": [
"0000-01-01T00:00:00.000/3000-01-01T00:00:00.000"
],
"granularity": "all",
"dimensions": ["k", "v", "nonexistent"],
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"version": "v1",
"timestamp": "0000-01-01T00:00:00.000Z",
"event": {
"k": "Wikipedia:Vandalismusmeldung",
"v": "lookup!",
"nonexistent": null,
"rows": 1
}
}
]
},
{
"description": "groupBy on inline",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "inline",
"columnNames": ["k", "v"],
"columnTypes": ["string", "string"],
"rows": [
["Wikipedia:Vandalismusmeldung", "inline!"]
]
},
"intervals": [
"0000-01-01T00:00:00.000/3000-01-01T00:00:00.000"
],
"granularity": "all",
"dimensions": ["k", "v", "nonexistent"],
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"version": "v1",
"timestamp": "0000-01-01T00:00:00.000Z",
"event": {
"k": "Wikipedia:Vandalismusmeldung",
"v": "inline!",
"nonexistent": null,
"rows": 1
}
}
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,18 @@ public FluentQueryRunner applyPreMergeDecoration()
}

public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter)
{
return emitCPUTimeMetric(emitter, new AtomicLong(0L));
}

public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter, AtomicLong accumulator)
{
return from(
CPUTimeMetricQueryRunner.safeBuild(
baseRunner,
toolChest,
emitter,
new AtomicLong(0L),
accumulator,
true
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,18 @@ public boolean isGlobal()
@Override
public boolean isConcrete()
{
return false;
return true;
}

public Map<String, ValueType> getRowSignature()
{
final ImmutableMap.Builder<String, ValueType> retVal = ImmutableMap.builder();

for (int i = 0; i < columnNames.size(); i++) {
retVal.put(columnNames.get(i), columnTypes.get(i));
final ValueType columnType = columnTypes.get(i);
if (columnType != null) {
retVal.put(columnNames.get(i), columnType);
}
}

return retVal.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public boolean isGlobal()
@Override
public boolean isConcrete()
{
return false;
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment;

import org.apache.druid.query.DataSource;
import org.joda.time.Interval;

/**
* Utility for creating {@link Segment} objects for concrete datasources.
*
* @see org.apache.druid.guice.DruidBinders#segmentWranglerBinder to register factories
*/
public interface SegmentWrangler
{
/**
* Gets Segments for a particular datasource and set of intervals. These are expected to exist for any datasource
* where {@link DataSource#isConcrete} and {@link DataSource#isGlobal} are both true (corresponding to datasources
* where any Druid server could scan its data).
*
* Note: there are no SegmentWranglers for 'table' datasources (Druid's distributed datasources) because those are
* special and handled in their own special way.
*
* @return Segments that, collectively, contain data for dataSource. May be empty if dataSource does not exist or
* has no data in the provided intervals. May contain data outside the provided intervals, so callers should
* filter it down further, e.g. through the "interval" parameter of {@link StorageAdapter#makeCursors}
*/
Iterable<Segment> getSegmentsForIntervals(DataSource dataSource, Iterable<Interval> intervals);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

/**
* Utility for creating {@link Joinable} objects.
*
* @see org.apache.druid.guice.DruidBinders#joinableFactoryBinder to register factories
*/
public interface JoinableFactory
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void test_isGlobal()
@Test
public void test_isConcrete()
{
Assert.assertFalse(listDataSource.isConcrete());
Assert.assertTrue(listDataSource.isConcrete());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void test_isGlobal()
@Test
public void test_isConcrete()
{
Assert.assertFalse(lookylooDataSource.isConcrete());
Assert.assertTrue(lookylooDataSource.isConcrete());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testLookup()
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(LOOKUP_LOOKYLOO);

Assert.assertFalse(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Expand All @@ -153,7 +153,7 @@ public void testQueryOnLookup()
final QueryDataSource queryDataSource = subquery(LOOKUP_LOOKYLOO);
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryDataSource);

Assert.assertFalse(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertTrue(analysis.isQuery());
Expand All @@ -172,7 +172,7 @@ public void testInline()
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(INLINE);

Assert.assertFalse(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Expand Down Expand Up @@ -378,7 +378,7 @@ public void testJoinLookupToLookup()

final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(joinDataSource);

Assert.assertFalse(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
Assert.assertTrue(analysis.isGlobal());
Assert.assertFalse(analysis.isQuery());
Expand Down
22 changes: 15 additions & 7 deletions server/src/main/java/org/apache/druid/guice/DruidBinders.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.DruidNode;

/**
*/
public class DruidBinders
{
public static MapBinder<Class<? extends Query>, QueryRunnerFactory> queryRunnerFactoryBinder(Binder binder)
{
return MapBinder.newMapBinder(
binder,
new TypeLiteral<Class<? extends Query>>(){},
new TypeLiteral<Class<? extends Query>>() {},
TypeLiteral.get(QueryRunnerFactory.class)
);
}
Expand All @@ -48,19 +47,28 @@ public static MapBinder<Class<? extends Query>, QueryToolChest> queryToolChestBi
{
return MapBinder.newMapBinder(
binder,
new TypeLiteral<Class<? extends Query>>(){},
new TypeLiteral<QueryToolChest>(){}
new TypeLiteral<Class<? extends Query>>() {},
new TypeLiteral<QueryToolChest>() {}
);
}

public static Multibinder<KeyHolder<DruidNode>> discoveryAnnouncementBinder(Binder binder)
{
return Multibinder.newSetBinder(binder, new TypeLiteral<KeyHolder<DruidNode>>(){});
return Multibinder.newSetBinder(binder, new TypeLiteral<KeyHolder<DruidNode>>() {});
}

public static Multibinder<Class<? extends Monitor>> metricMonitorBinder(Binder binder)
{
return Multibinder.newSetBinder(binder, new TypeLiteral<Class<? extends Monitor>>(){});
return Multibinder.newSetBinder(binder, new TypeLiteral<Class<? extends Monitor>>() {});
}

public static MapBinder<Class<? extends DataSource>, SegmentWrangler> segmentWranglerBinder(Binder binder)
{
return MapBinder.newMapBinder(
binder,
new TypeLiteral<Class<? extends DataSource>>() {},
new TypeLiteral<SegmentWrangler>() {}
);
}

public static MapBinder<Class<? extends DataSource>, JoinableFactory> joinableFactoryBinder(Binder binder)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.LookupSegmentWrangler;
import org.apache.druid.segment.MapSegmentWrangler;
import org.apache.druid.segment.SegmentWrangler;

import java.util.Map;

/**
* Module that installs DataSource-class-specific {@link SegmentWrangler} implementations.
*/
public class SegmentWranglerModule implements Module
{
/**
* Default mappings of datasources to factories.
*/
@VisibleForTesting
static final Map<Class<? extends DataSource>, Class<? extends SegmentWrangler>> WRANGLER_MAPPINGS =
ImmutableMap.of(
InlineDataSource.class, InlineSegmentWrangler.class,
LookupDataSource.class, LookupSegmentWrangler.class
);

@Override
public void configure(Binder binder)
{
final MapBinder<Class<? extends DataSource>, SegmentWrangler> segmentWranglers =
DruidBinders.segmentWranglerBinder(binder);

WRANGLER_MAPPINGS.forEach((ds, wrangler) -> {
segmentWranglers.addBinding(ds).to(wrangler);
binder.bind(wrangler).in(LazySingleton.class);
});

binder.bind(SegmentWrangler.class).to(MapSegmentWrangler.class)
.in(Scopes.SINGLETON);
}
}
Loading