From ad59a6bb1bb3080062cf33215894153063c32a55 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 15 Jan 2016 10:23:50 +0900 Subject: [PATCH 1/7] Show candidate hosts for the given query --- .../druid/client/selector/ServerSelector.java | 14 ++ .../druid/query/LocatedSegmentDescriptor.java | 88 ++++++++++++ .../io/druid/server/BrokerQueryResource.java | 133 ++++++++++++++++++ .../java/io/druid/server/QueryResource.java | 20 +-- .../src/main/java/io/druid/cli/CliBroker.java | 6 +- 5 files changed, 248 insertions(+), 13 deletions(-) create mode 100644 server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java create mode 100644 server/src/main/java/io/druid/server/BrokerQueryResource.java diff --git a/server/src/main/java/io/druid/client/selector/ServerSelector.java b/server/src/main/java/io/druid/client/selector/ServerSelector.java index 3a4f58cfb8ba..900b97bfaa53 100644 --- a/server/src/main/java/io/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/io/druid/client/selector/ServerSelector.java @@ -19,10 +19,14 @@ package io.druid.client.selector; +import com.google.api.client.util.Lists; import com.google.common.collect.Sets; import com.metamx.emitter.EmittingLogger; +import io.druid.client.DruidServer; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; +import java.util.List; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; @@ -78,6 +82,16 @@ public boolean isEmpty() } } + public List getCandidates() { + List result = Lists.newArrayList(); + synchronized (this) { + for (QueryableDruidServer server : servers) { + result.add(server.getServer().getMetadata()); + } + } + return result; + } + public QueryableDruidServer pick() { synchronized (this) { diff --git a/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java b/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java new file mode 100644 index 000000000000..fd29895cf540 --- /dev/null +++ b/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java @@ -0,0 +1,88 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.util.Sets; +import com.google.common.collect.ImmutableList; +import io.druid.server.coordination.DruidServerMetadata; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Set; + +/** + */ +public class LocatedSegmentDescriptor extends SegmentDescriptor +{ + private final List locations; + + @JsonCreator + public LocatedSegmentDescriptor( + @JsonProperty("itvl") Interval interval, + @JsonProperty("ver") String version, + @JsonProperty("part") int partitionNumber, + @JsonProperty("locations") List locations + ) + { + super(interval, version, partitionNumber); + this.locations = locations == null ? ImmutableList.of() : locations; + } + + public LocatedSegmentDescriptor(SegmentDescriptor descriptor, List candidates) + { + this(descriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber(), candidates); + } + + @JsonProperty("locations") + public List getLocations() + { + return locations; + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof LocatedSegmentDescriptor) || !super.equals(o)) { + return false; + } + + LocatedSegmentDescriptor other = (LocatedSegmentDescriptor) o; + return getHostNames().equals(other.getHostNames()); + } + + private Set getHostNames() + { + Set hostNames = Sets.newHashSet(); + for (DruidServerMetadata meta : locations) { + hostNames.add(meta.getHost()); + } + return hostNames; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + getHostNames().hashCode(); + return result; + } +} diff --git a/server/src/main/java/io/druid/server/BrokerQueryResource.java b/server/src/main/java/io/druid/server/BrokerQueryResource.java new file mode 100644 index 000000000000..7f005c0d045c --- /dev/null +++ b/server/src/main/java/io/druid/server/BrokerQueryResource.java @@ -0,0 +1,133 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.metamx.common.guava.Sequences; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.TimelineServerView; +import io.druid.client.selector.ServerSelector; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.query.LocatedSegmentDescriptor; +import io.druid.query.Query; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.SegmentDescriptor; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ServerConfig; +import io.druid.server.log.RequestLogger; +import io.druid.timeline.TimelineLookup; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.partition.PartitionChunk; +import org.joda.time.Interval; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +/** + */ +@Path("/druid/v2/") +public class BrokerQueryResource extends QueryResource +{ + private final TimelineServerView brokerServerView; + + @Inject + public BrokerQueryResource( + ServerConfig config, + @Json ObjectMapper jsonMapper, + @Smile ObjectMapper smileMapper, + QuerySegmentWalker texasRanger, + ServiceEmitter emitter, + RequestLogger requestLogger, + QueryManager queryManager, + TimelineServerView brokerServerView + ) + { + super(config, jsonMapper, smileMapper, texasRanger, emitter, requestLogger, queryManager); + this.brokerServerView = brokerServerView; + } + + @POST + @Path("/candidates") + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE}) + public Response getQueryTargets( + InputStream in, + @QueryParam("pretty") String pretty, + @Context final HttpServletRequest req // used only to get request content-type and remote address + ) throws IOException + { + final String reqContentType = req.getContentType(); + final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType) + || APPLICATION_SMILE.equals(reqContentType); + final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON; + + ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; + final ObjectWriter jsonWriter = pretty != null + ? objectMapper.writerWithDefaultPrettyPrinter() + : objectMapper.writer(); + + try { + Query query = objectMapper.readValue(in, Query.class); + TimelineLookup timeline = brokerServerView.getTimeline(query.getDataSource()); + if (timeline == null) { + return Response.ok(Sequences.empty()).build(); + } + List located = Lists.newArrayList(); + for (Interval interval : query.getIntervals()) { + for (TimelineObjectHolder holder : timeline.lookup(interval)) { + for (PartitionChunk chunk : holder.getObject()) { + ServerSelector selector = chunk.getObject(); + final SegmentDescriptor descriptor = new SegmentDescriptor( + holder.getInterval(), holder.getVersion(), chunk.getChunkNumber() + ); + List candidates = selector.getCandidates(); + located.add(new LocatedSegmentDescriptor(descriptor, candidates)); + } + } + } + return Response.ok(jsonWriter.writeValueAsString(located), contentType).build(); + } + catch (Exception e) { + return Response.serverError().type(contentType).entity( + jsonWriter.writeValueAsString( + ImmutableMap.of( + "error", e.getMessage() == null ? "null exception" : e.getMessage() + ) + ) + ).build(); + } + } +} diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 0b9ac2b0fa50..04c2b9061447 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -68,19 +68,19 @@ @Path("/druid/v2/") public class QueryResource { - private static final EmittingLogger log = new EmittingLogger(QueryResource.class); + protected static final EmittingLogger log = new EmittingLogger(QueryResource.class); @Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE - private static final String APPLICATION_SMILE = "application/smile"; + protected static final String APPLICATION_SMILE = "application/smile"; - private static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7*1024; + protected static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7*1024; - private final ServerConfig config; - private final ObjectMapper jsonMapper; - private final ObjectMapper smileMapper; - private final QuerySegmentWalker texasRanger; - private final ServiceEmitter emitter; - private final RequestLogger requestLogger; - private final QueryManager queryManager; + protected final ServerConfig config; + protected final ObjectMapper jsonMapper; + protected final ObjectMapper smileMapper; + protected final QuerySegmentWalker texasRanger; + protected final ServiceEmitter emitter; + protected final RequestLogger requestLogger; + protected final QueryManager queryManager; @Inject public QueryResource( diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 7994bd541e5a..95c25b6db0eb 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -44,9 +44,9 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.lookup.LookupModule; +import io.druid.server.BrokerQueryResource; import io.druid.server.ClientInfoResource; import io.druid.server.ClientQuerySegmentWalker; -import io.druid.server.QueryResource; import io.druid.server.coordination.broker.DruidBroker; import io.druid.server.http.BrokerResource; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -103,10 +103,10 @@ public void configure(Binder binder) binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); - Jerseys.addResource(binder, QueryResource.class); + Jerseys.addResource(binder, BrokerQueryResource.class); Jerseys.addResource(binder, BrokerResource.class); Jerseys.addResource(binder, ClientInfoResource.class); - LifecycleModule.register(binder, QueryResource.class); + LifecycleModule.register(binder, BrokerQueryResource.class); LifecycleModule.register(binder, DruidBroker.class); MetricsModule.register(binder, CacheMonitor.class); From 4063c064666fd8c259b2d0a8c3fadd434c42accc Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Tue, 22 Mar 2016 18:13:52 +0900 Subject: [PATCH 2/7] added get method with datasource/intervals param --- .../druid/client/selector/ServerSelector.java | 2 +- .../druid/query/LocatedSegmentDescriptor.java | 2 +- .../io/druid/server/BrokerQueryResource.java | 63 ++++++++++++++----- 3 files changed, 50 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/io/druid/client/selector/ServerSelector.java b/server/src/main/java/io/druid/client/selector/ServerSelector.java index 900b97bfaa53..5354aefd9a41 100644 --- a/server/src/main/java/io/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/io/druid/client/selector/ServerSelector.java @@ -19,7 +19,7 @@ package io.druid.client.selector; -import com.google.api.client.util.Lists; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.emitter.EmittingLogger; import io.druid.client.DruidServer; diff --git a/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java b/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java index fd29895cf540..9d052e7ae9c6 100644 --- a/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java +++ b/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java @@ -21,8 +21,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.api.client.util.Sets; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; import io.druid.server.coordination.DruidServerMetadata; import org.joda.time.Interval; diff --git a/server/src/main/java/io/druid/server/BrokerQueryResource.java b/server/src/main/java/io/druid/server/BrokerQueryResource.java index 7f005c0d045c..7fb109f0b65b 100644 --- a/server/src/main/java/io/druid/server/BrokerQueryResource.java +++ b/server/src/main/java/io/druid/server/BrokerQueryResource.java @@ -29,12 +29,15 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.TimelineServerView; import io.druid.client.selector.ServerSelector; +import io.druid.common.utils.JodaUtils; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; +import io.druid.query.DataSource; import io.druid.query.LocatedSegmentDescriptor; import io.druid.query.Query; import io.druid.query.QuerySegmentWalker; import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; @@ -45,6 +48,7 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; +import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; @@ -54,6 +58,7 @@ import javax.ws.rs.core.Response; import java.io.IOException; import java.io.InputStream; +import java.util.Collections; import java.util.List; /** @@ -101,23 +106,10 @@ public Response getQueryTargets( try { Query query = objectMapper.readValue(in, Query.class); - TimelineLookup timeline = brokerServerView.getTimeline(query.getDataSource()); - if (timeline == null) { + List located = getTargetLocations(query.getDataSource(), query.getIntervals()); + if (located == null || located.isEmpty()) { return Response.ok(Sequences.empty()).build(); } - List located = Lists.newArrayList(); - for (Interval interval : query.getIntervals()) { - for (TimelineObjectHolder holder : timeline.lookup(interval)) { - for (PartitionChunk chunk : holder.getObject()) { - ServerSelector selector = chunk.getObject(); - final SegmentDescriptor descriptor = new SegmentDescriptor( - holder.getInterval(), holder.getVersion(), chunk.getChunkNumber() - ); - List candidates = selector.getCandidates(); - located.add(new LocatedSegmentDescriptor(descriptor, candidates)); - } - } - } return Response.ok(jsonWriter.writeValueAsString(located), contentType).build(); } catch (Exception e) { @@ -130,4 +122,45 @@ public Response getQueryTargets( ).build(); } } + + @GET + @Path("/candidates") + @Produces(MediaType.APPLICATION_JSON) + public Response getQueryTargets( + @QueryParam("datasource") String datasource, + @QueryParam("intervals") String intervals + ) throws IOException + { + List intervalList = Lists.newArrayList(); + for (String interval : intervals.split(",")) { + intervalList.add(Interval.parse(interval.trim())); + } + List located = getTargetLocations( + new TableDataSource(datasource), + JodaUtils.condenseIntervals(intervalList) + ); + return Response.ok(located).build(); + } + + private List getTargetLocations(DataSource datasource, List intervals) + { + TimelineLookup timeline = brokerServerView.getTimeline(datasource); + if (timeline == null) { + return Collections.emptyList(); + } + List located = Lists.newArrayList(); + for (Interval interval : intervals) { + for (TimelineObjectHolder holder : timeline.lookup(interval)) { + for (PartitionChunk chunk : holder.getObject()) { + ServerSelector selector = chunk.getObject(); + final SegmentDescriptor descriptor = new SegmentDescriptor( + holder.getInterval(), holder.getVersion(), chunk.getChunkNumber() + ); + List candidates = selector.getCandidates(); + located.add(new LocatedSegmentDescriptor(descriptor, candidates)); + } + } + } + return located; + } } From 718da13a81c14c36e48052c5cadb5e5c79fb1461 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 23 Mar 2016 10:01:35 +0900 Subject: [PATCH 3/7] provide approximated size of target segment --- .../druid/timeline/TimelineObjectHolder.java | 8 ++++ .../timeline/VersionedIntervalTimeline.java | 41 ++++++++++++++----- .../VersionedIntervalTimelineTest.java | 2 +- .../select/MultiSegmentSelectQueryTest.java | 6 +-- .../io/druid/segment/SchemalessIndex.java | 2 +- .../io/druid/client/BrokerServerView.java | 7 +++- .../druid/client/CoordinatorServerView.java | 3 +- .../IndexerSQLMetadataStorageCoordinator.java | 7 +++- .../metadata/SQLMetadataSegmentManager.java | 4 +- .../druid/query/LocatedSegmentDescriptor.java | 13 +++++- .../io/druid/server/BrokerQueryResource.java | 2 +- .../server/coordination/ServerManager.java | 3 +- .../DruidCoordinatorCleanupOvershadowed.java | 5 ++- .../helper/DruidCoordinatorSegmentMerger.java | 3 +- ...chingClusteredClientFunctionalityTest.java | 2 +- .../client/CachingClusteredClientTest.java | 4 +- .../FiniteAppenderatorDriverTest.java | 3 +- .../server/shard/NumberedShardSpecTest.java | 2 +- 18 files changed, 86 insertions(+), 31 deletions(-) diff --git a/common/src/main/java/io/druid/timeline/TimelineObjectHolder.java b/common/src/main/java/io/druid/timeline/TimelineObjectHolder.java index de7dcd28c43d..ecb16ceaf06f 100644 --- a/common/src/main/java/io/druid/timeline/TimelineObjectHolder.java +++ b/common/src/main/java/io/druid/timeline/TimelineObjectHolder.java @@ -28,16 +28,19 @@ public class TimelineObjectHolder implements LogicalSeg { private final Interval interval; private final VersionType version; + private final long approxSize; private final PartitionHolder object; public TimelineObjectHolder( Interval interval, VersionType version, + long approxSize, PartitionHolder object ) { this.interval = interval; this.version = version; + this.approxSize = approxSize; this.object = object; } @@ -52,6 +55,11 @@ public VersionType getVersion() return version; } + public long getApproximatedSize() + { + return approxSize; + } + public PartitionHolder getObject() { return object; diff --git a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java index 19219f59d160..a6ab3b8cb5f3 100644 --- a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java +++ b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java @@ -41,17 +41,17 @@ /** * VersionedIntervalTimeline is a data structure that manages objects on a specific timeline. - * + *

* It associates a jodatime Interval and a generically-typed version with the object that is being stored. - * + *

* In the event of overlapping timeline entries, timeline intervals may be chunked. The underlying data associated * with a timeline entry remains unchanged when chunking occurs. - * + *

* After loading objects via the add() method, the lookup(Interval) method can be used to get the list of the most * recent objects (according to the version) that match the given interval. The intent is that objects represent * a certain time period and when you do a lookup(), you are asking for all of the objects that you need to look * at in order to get a correct answer about that time period. - * + *

* The findOvershadowed() method returns a list of objects that will never be returned by a call to lookup() because * they are overshadowed by some other object. This can be used in conjunction with the add() and remove() methods * to achieve "atomic" updates. First add new items, then check if those items caused anything to be overshadowed, if @@ -81,6 +81,11 @@ public VersionedIntervalTimeline( } public void add(final Interval interval, VersionType version, PartitionChunk object) + { + add(interval, version, object, -1); + } + + public void add(final Interval interval, VersionType version, PartitionChunk object, long approxSize) { try { lock.writeLock().lock(); @@ -89,7 +94,7 @@ public void add(final Interval interval, VersionType version, PartitionChunk(object)); + entry = new TimelineEntry(interval, version, new PartitionHolder(object), approxSize); TreeMap versionEntry = new TreeMap(versionComparator); versionEntry.put(version, entry); allTimelineEntries.put(interval, versionEntry); @@ -97,7 +102,7 @@ public void add(final Interval interval, VersionType version, PartitionChunk(object)); + entry = new TimelineEntry(interval, version, new PartitionHolder(object), approxSize); exists.put(version, entry); } else { PartitionHolder partitionHolder = entry.getPartitionHolder(); @@ -179,7 +184,7 @@ public PartitionHolder findEntry(Interval interval, VersionType vers * @param interval interval to find objects for * * @return Holders representing the interval that the objects exist for, PartitionHolders - * are guaranteed to be complete + * are guaranteed to be complete */ public List> lookup(Interval interval) { @@ -244,6 +249,7 @@ public Set> findOvershadowed() new TimelineObjectHolder( object.getTrueInterval(), object.getVersion(), + object.getApproximatedSize(), object.getPartitionHolder() ) ); @@ -293,10 +299,10 @@ private void add( } /** - * * @param timeline * @param key * @param entry + * * @return boolean flag indicating whether or not we inserted or discarded something */ private boolean addAtKey( @@ -446,6 +452,7 @@ private List> lookup(Interval inte new TimelineObjectHolder( timelineInterval, val.getVersion(), + val.getApproximatedSize(), val.getPartitionHolder() ) ); @@ -464,6 +471,7 @@ private List> lookup(Interval inte new TimelineObjectHolder( new Interval(interval.getStart(), firstEntry.getInterval().getEnd()), firstEntry.getVersion(), + firstEntry.getApproximatedSize(), firstEntry.getObject() ) ); @@ -476,6 +484,7 @@ private List> lookup(Interval inte new TimelineObjectHolder( new Interval(lastEntry.getInterval().getStart(), interval.getEnd()), lastEntry.getVersion(), + lastEntry.getApproximatedSize(), lastEntry.getObject() ) ); @@ -489,12 +498,19 @@ public class TimelineEntry private final Interval trueInterval; private final VersionType version; private final PartitionHolder partitionHolder; - - public TimelineEntry(Interval trueInterval, VersionType version, PartitionHolder partitionHolder) + private final long approxSize; + + public TimelineEntry( + Interval trueInterval, + VersionType version, + PartitionHolder partitionHolder, + long approxSize + ) { this.trueInterval = trueInterval; this.version = version; this.partitionHolder = partitionHolder; + this.approxSize = approxSize; } public Interval getTrueInterval() @@ -511,5 +527,10 @@ public PartitionHolder getPartitionHolder() { return partitionHolder; } + + public long getApproximatedSize() + { + return approxSize; + } } } diff --git a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java index bc88e39d9050..0bcebf0b83bb 100644 --- a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java +++ b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java @@ -1552,7 +1552,7 @@ private void add(String interval, String version, PartitionChunk value) private void add(Interval interval, String version, PartitionChunk value) { - timeline.add(interval, version, value); + timeline.add(interval, version, value, -1); } private void assertValues( diff --git a/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java b/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java index 8b0aaae6ced2..978bcb42f15a 100644 --- a/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java +++ b/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java @@ -139,9 +139,9 @@ public static void setup() throws IOException segment_override = new IncrementalIndexSegment(index2, makeIdentifier(index2, "v2")); VersionedIntervalTimeline timeline = new VersionedIntervalTimeline(StringComparators.LEXICOGRAPHIC); - timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk(segment0)); - timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk(segment1)); - timeline.add(index2.getInterval(), "v2", new SingleElementPartitionChunk(segment_override)); + timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk(segment0), -1); + timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk(segment1), -1); + timeline.add(index2.getInterval(), "v2", new SingleElementPartitionChunk(segment_override), -1); segmentIdentifiers = Lists.newArrayList(); for (TimelineObjectHolder holder : timeline.lookup(new Interval("2011-01-12/2011-01-14"))) { diff --git a/processing/src/test/java/io/druid/segment/SchemalessIndex.java b/processing/src/test/java/io/druid/segment/SchemalessIndex.java index ab8e2019ab3a..34863d9754ae 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessIndex.java +++ b/processing/src/test/java/io/druid/segment/SchemalessIndex.java @@ -448,7 +448,7 @@ private static QueryableIndex makeAppendedMMappedIndex( ShardSpec noneShardSpec = new NoneShardSpec(); for (int i = 0; i < intervals.size(); i++) { - timeline.add(intervals.get(i), i, noneShardSpec.createChunk(filesToMap.get(i))); + timeline.add(intervals.get(i), i, noneShardSpec.createChunk(filesToMap.get(i)), -1); } final List adapters = Lists.newArrayList( diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 95826bb60ca5..2b2a43cafd94 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -229,7 +229,12 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm timelines.put(segment.getDataSource(), timeline); } - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); + timeline.add( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(selector), + segment.getSize() + ); selectors.put(segmentId, selector); } diff --git a/server/src/main/java/io/druid/client/CoordinatorServerView.java b/server/src/main/java/io/druid/client/CoordinatorServerView.java index e61d7a309dc3..1b076cec17ee 100644 --- a/server/src/main/java/io/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/io/druid/client/CoordinatorServerView.java @@ -141,7 +141,8 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm timeline.add( segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk(segmentLoadInfo) + segment.getShardSpec().createChunk(segmentLoadInfo), + segment.getSize() ); segmentLoadInfos.put(segmentId, segmentLoadInfo); } diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index db64cb6fa6df..74ba9bf74721 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -250,7 +250,12 @@ private VersionedIntervalTimeline getTimelineForIntervalsWi DataSegment.class ); - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); + timeline.add( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(segment), + segment.getSize() + ); } diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java index 0dbeadbd1759..153680e77a88 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java @@ -54,7 +54,6 @@ import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; -import org.skife.jdbi.v2.exceptions.TransactionFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -229,7 +228,8 @@ public VersionedIntervalTimeline fold( timeline.add( segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk(segment) + segment.getShardSpec().createChunk(segment), + segment.getSize() ); return timeline; diff --git a/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java b/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java index 9d052e7ae9c6..c5aac03d79dd 100644 --- a/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java +++ b/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java @@ -33,6 +33,7 @@ */ public class LocatedSegmentDescriptor extends SegmentDescriptor { + private final long size; private final List locations; @JsonCreator @@ -40,16 +41,24 @@ public LocatedSegmentDescriptor( @JsonProperty("itvl") Interval interval, @JsonProperty("ver") String version, @JsonProperty("part") int partitionNumber, + @JsonProperty("size") long size, @JsonProperty("locations") List locations ) { super(interval, version, partitionNumber); + this.size = size; this.locations = locations == null ? ImmutableList.of() : locations; } - public LocatedSegmentDescriptor(SegmentDescriptor descriptor, List candidates) + public LocatedSegmentDescriptor(SegmentDescriptor descriptor, long size, List candidates) { - this(descriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber(), candidates); + this(descriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber(), size, candidates); + } + + @JsonProperty("size") + public long getSize() + { + return size; } @JsonProperty("locations") diff --git a/server/src/main/java/io/druid/server/BrokerQueryResource.java b/server/src/main/java/io/druid/server/BrokerQueryResource.java index 7fb109f0b65b..85f61dc5c3cd 100644 --- a/server/src/main/java/io/druid/server/BrokerQueryResource.java +++ b/server/src/main/java/io/druid/server/BrokerQueryResource.java @@ -157,7 +157,7 @@ private List getTargetLocations(DataSource datasource, holder.getInterval(), holder.getVersion(), chunk.getChunkNumber() ); List candidates = selector.getCandidates(); - located.add(new LocatedSegmentDescriptor(descriptor, candidates)); + located.add(new LocatedSegmentDescriptor(descriptor, holder.getApproximatedSize(), candidates)); } } } diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 4b7c218aa167..07f9d6c86f67 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -186,7 +186,8 @@ public boolean loadSegment(final DataSegment segment) throws SegmentLoadingExcep loadedIntervals.add( segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter)) + segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter)), + segment.getSize() ); synchronized (dataSourceSizes) { dataSourceSizes.add(dataSource, segment.getSize()); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java index 20bb23d6e417..c3b71c3cdfe8 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java @@ -68,7 +68,10 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) for (DataSegment segment : dataSource.getSegments()) { timeline.add( - segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment) + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(segment), + segment.getSize() ); } } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java index d9fdbe894154..1c5ca7f66173 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java @@ -89,7 +89,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) timeline.add( dataSegment.getInterval(), dataSegment.getVersion(), - dataSegment.getShardSpec().createChunk(dataSegment) + dataSegment.getShardSpec().createChunk(dataSegment), + dataSegment.getSize() ); } } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java index 99d30caec6be..c468ef494f32 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java @@ -164,7 +164,7 @@ public QueryableDruidServer pick(TreeMap> pri } } ) - )); + ), -1); } protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService) diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 4f09b05cfcfc..2bb0ad520cee 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -559,7 +559,7 @@ public void testCachingOverBulkLimitEnforcesLimit() throws Exception new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) ); selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), dataSegment); - timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector)); + timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector), -1); client.run(query, context); @@ -1734,7 +1734,7 @@ private List> populateTimeline( } chunk = new StringPartitionChunk<>(start, end, j, selector); } - timeline.add(queryIntervals.get(k), String.valueOf(k), chunk); + timeline.add(queryIntervals.get(k), String.valueOf(k), chunk, -1); } } return serverExpectationList; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java index 2e108fc45a23..e21c66115623 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java @@ -278,7 +278,8 @@ public Set findUsedSegments(Set identifiers) thr timeline.add( dataSegment.getInterval(), dataSegment.getVersion(), - dataSegment.getShardSpec().createChunk(dataSegment) + dataSegment.getShardSpec().createChunk(dataSegment), + dataSegment.getSize() ); } diff --git a/server/src/test/java/io/druid/server/shard/NumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/NumberedShardSpecTest.java index 33187c17a984..d9a2d3033180 100644 --- a/server/src/test/java/io/druid/server/shard/NumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/NumberedShardSpecTest.java @@ -186,7 +186,7 @@ private void testVersionedIntervalTimelineBehaviorForNumberedShardSpec( Interval interval = new Interval("2000/3000"); String version = "v1"; for (PartitionChunk chunk : chunks) { - timeline.add(interval, version, chunk); + timeline.add(interval, version, chunk, -1); } Set actualObjects = new HashSet<>(); From 319f9fa759ef7e259f77a08a95afef0fe81fa29e Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Thu, 21 Apr 2016 10:10:32 +0900 Subject: [PATCH 4/7] Support queried input format for hive integration --- extensions-core/hive-extensions/pom.xml | 191 ++++++ .../io/druid/hive/ExpressionConverter.java | 196 +++++++ .../io/druid/hive/HiveDruidInputFormat.java | 139 +++++ .../java/io/druid/hive/HiveDruidSerDe.java | 135 +++++ .../druid/hive/ExpressionConverterTest.java | 127 ++++ .../druid/hive/HiveDruidInputFormatTest.java | 27 + .../io/druid/indexer/hadoop/MapWritable.java | 95 +++ .../indexer/hadoop/QueryBasedInputFormat.java | 553 ++++++++++++++++++ .../hadoop/QueryBasedInputFormatTest.java | 73 +++ pom.xml | 1 + 10 files changed, 1537 insertions(+) create mode 100644 extensions-core/hive-extensions/pom.xml create mode 100644 extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java create mode 100644 extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java create mode 100644 extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidSerDe.java create mode 100644 extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java create mode 100644 extensions-core/hive-extensions/src/test/java/io/druid/hive/HiveDruidInputFormatTest.java create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/hadoop/MapWritable.java create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/hadoop/QueryBasedInputFormatTest.java diff --git a/extensions-core/hive-extensions/pom.xml b/extensions-core/hive-extensions/pom.xml new file mode 100644 index 000000000000..e348b7d9f883 --- /dev/null +++ b/extensions-core/hive-extensions/pom.xml @@ -0,0 +1,191 @@ + + + + + 4.0.0 + + io.druid.extensions + druid-hive-extensions + druid-hive-extensions + druid-hive-extensions + + + io.druid + druid + 0.9.1-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-api + ${project.parent.version} + provided + + + io.druid + druid-common + ${project.parent.version} + provided + + + io.druid + druid-indexing-hadoop + ${project.parent.version} + provided + + + org.apache.hive + hive-exec + 2.0.0 + + + + org.apache.hadoop + hadoop-client + compile + + + commons-cli + commons-cli + + + commons-httpclient + commons-httpclient + + + log4j + log4j + + + commons-codec + commons-codec + + + commons-logging + commons-logging + + + commons-io + commons-io + + + commons-lang + commons-lang + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + javax.ws.rs + jsr311-api + + + com.google.code.findbugs + jsr305 + + + org.mortbay.jetty + jetty-util + + + javax.activation + activation + + + com.google.protobuf + protobuf-java + + + com.sun.jersey + jersey-core + + + + + com.metamx + emitter + provided + + + commons-io + commons-io + provided + + + + + junit + junit + test + + + io.druid + druid-server + ${project.parent.version} + test + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.compile.version} + tests + test + + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + tests + test + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.compile.version} + test + + + + diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java new file mode 100644 index 000000000000..e67a34655e72 --- /dev/null +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java @@ -0,0 +1,196 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import com.google.common.collect.Lists; +import com.metamx.common.logger.Logger; +import io.druid.common.utils.JodaUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.joda.time.Interval; + +import java.text.DateFormat; +import java.text.ParseException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +/** + */ +public class ExpressionConverter +{ + private static final Logger logger = new Logger(ExpressionConverter.class); + + public static final String TIME_COLUMN_NAME = "__time"; + + static List convert(Configuration jobConf) + { + String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filterExprSerialized == null) { + logger.info("No predicate is pushed down"); + return Collections.emptyList(); + } + List intervals = getIntervals(SerializationUtilities.deserializeExpression(filterExprSerialized)); + logger.info("Extracted intervals : " + intervals); + return intervals; + } + + static List getIntervals(ExprNodeGenericFuncDesc filterExpr) + { + logger.info("Start analyzing predicate " + filterExpr.getExprString()); + SearchArgument searchArgument = ConvertAstToSearchArg.create(filterExpr); + ExpressionTree root = searchArgument.getExpression(); + + List leaves = Lists.newArrayList(searchArgument.getLeaves()); + + List currents = null; + if (root.getOperator() == ExpressionTree.Operator.AND) { + for (ExpressionTree child : root.getChildren()) { + String extracted = extractSoleColumn(child, leaves, null); + if (TIME_COLUMN_NAME.equals(extracted)) { + List intervals = extractIntervals(child, leaves, false); + if (currents == null) { + currents = intervals; + continue; + } + // (a or b) and (c or d) -> (a and c) or (b and c) or (a and d) or (b and d) + List overlapped = Lists.newArrayList(); + for (Interval current : currents) { + for (Interval interval : intervals) { + Interval overlap = current.overlap(interval); + if (overlap != null) { + overlapped.add(overlap); + } + } + } + currents = overlapped; + } + } + } else { + String extracted = extractSoleColumn(root, leaves, null); + if (TIME_COLUMN_NAME.equals(extracted)) { + currents = extractIntervals(root, leaves, false); + } + } + return currents != null ? JodaUtils.condenseIntervals(currents) : Collections.emptyList(); + } + + private static String extractSoleColumn(ExpressionTree tree, List leaves, String current) + { + if (tree.getOperator() == ExpressionTree.Operator.LEAF) { + String columnName = leaves.get(tree.getLeaf()).getColumnName(); + if (current == null || current.equals(columnName)) { + return columnName; + } + return null; + } + List children = tree.getChildren(); + if (children != null && !children.isEmpty()) { + for (ExpressionTree child : children) { + current = extractSoleColumn(child, leaves, current); + } + } + return current; + } + + private static List extractIntervals(ExpressionTree tree, List leaves, boolean withNot) + { + if (tree.getOperator() == ExpressionTree.Operator.NOT) { + return extractIntervals(tree.getChildren().get(0), leaves, !withNot); + } + if (tree.getOperator() == ExpressionTree.Operator.LEAF) { + return leafToIntervals(leaves.get(tree.getLeaf()), withNot); + } + if (tree.getOperator() == ExpressionTree.Operator.OR) { + List intervals = Lists.newArrayList(); + for (ExpressionTree child : tree.getChildren()) { + List extracted = extractIntervals(child, leaves, withNot); + if (extracted != null) { + intervals.addAll(extracted); + } + } + } + return null; + } + + private static List leafToIntervals(PredicateLeaf hiveLeaf, boolean withNot) + { + switch (hiveLeaf.getOperator()) { + case LESS_THAN: + case LESS_THAN_EQUALS: + Long time = literalToTime(hiveLeaf.getLiteral()); + if (time != null) { + return Arrays.asList( + withNot ? new Interval(time, JodaUtils.MAX_INSTANT) : new Interval(JodaUtils.MIN_INSTANT, time) + ); + } + return null; + case BETWEEN: + Long start = literalToTime(hiveLeaf.getLiteralList().get(0)); + Long end = literalToTime(hiveLeaf.getLiteralList().get(1)); + if (start != null && end != null) { + return withNot ? Arrays.asList( + new Interval(JodaUtils.MIN_INSTANT, start), + new Interval(end, JodaUtils.MAX_INSTANT) + ) : Arrays.asList(new Interval(start, end)); + } + return null; + } + if (hiveLeaf.getOperator() == PredicateLeaf.Operator.EQUALS && hiveLeaf.getLiteral() instanceof String) { + try { + Interval interval = new Interval(hiveLeaf.getLiteral()); + return withNot ? Arrays.asList( + new Interval(JodaUtils.MIN_INSTANT, interval.getStartMillis()), + new Interval(interval.getEndMillis(), JodaUtils.MAX_INSTANT) + ) : Arrays.asList(interval); + } + catch (IllegalArgumentException e) { + // best effort. ignore + } + } + return null; + } + + private static Long literalToTime(Object literal) + { + if (literal instanceof Long) { + return (Long) literal; + } + if (literal instanceof Date) { + return ((Date) literal).getTime(); + } + if (literal instanceof String) { + try { + return DateFormat.getDateInstance().parse((String) literal).getTime(); + } + catch (ParseException e) { + // best effort. ignore + } + } + return null; + } +} diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java new file mode 100644 index 000000000000..2427171847c0 --- /dev/null +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java @@ -0,0 +1,139 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import com.google.common.base.Functions; +import com.google.common.collect.Lists; +import io.druid.indexer.hadoop.QueryBasedInputFormat; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; +import org.joda.time.Interval; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +public class HiveDruidInputFormat extends QueryBasedInputFormat implements HiveOutputFormat +{ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException + { + DruidInputSplit[] splits = getInputSplits(job); + + String input = job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, ""); + String[] dirs = org.apache.hadoop.util.StringUtils.split(input); + if (dirs.length == 0) { + throw new IllegalStateException("input dir is null"); + } + Path path = new Path(dirs[0]); + InputSplit[] converted = new InputSplit[splits.length]; + for (int i = 0; i < converted.length; i++) { + converted[i] = new InputSplitWrapper(path, splits[i]); + } + return converted; + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException + { + DruidRecordReader reader = new DruidRecordReader(); + reader.initialize(((InputSplitWrapper) split).druidSplit, job); + return reader; + } + + @Override + protected final Configuration configure(Configuration configuration) + { + List intervals = ExpressionConverter.convert(configuration); + if (intervals == null || intervals.isEmpty()) { + throw new IllegalArgumentException("failed to extract intervals from predicate"); + } + configuration.set( + CONF_DRUID_INTERVALS, + StringUtils.join(Lists.transform(intervals, Functions.toStringFunction()), ",") + ); + return configuration; + } + + @Override + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) + throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter( + JobConf jc, + Path finalOutPath, + Class valueClass, + boolean isCompressed, + Properties tableProperties, + Progressable progress + ) throws IOException + { + throw new UnsupportedOperationException(); + } + + public static class InputSplitWrapper extends FileSplit + { + private DruidInputSplit druidSplit; + + public InputSplitWrapper() {} + + public InputSplitWrapper(Path path, DruidInputSplit druidSplit) + { + super(path, 0, 0, druidSplit.getLocations()); + this.druidSplit = druidSplit; + } + + public void write(DataOutput out) throws IOException + { + super.write(out); + druidSplit.write(out); + } + + public void readFields(DataInput in) throws IOException + { + super.readFields(in); + this.druidSplit = new DruidInputSplit(); + this.druidSplit.readFields(in); + } + } +} \ No newline at end of file diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidSerDe.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidSerDe.java new file mode 100644 index 000000000000..c09763ab3686 --- /dev/null +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidSerDe.java @@ -0,0 +1,135 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import com.google.common.collect.Lists; +import com.metamx.common.logger.Logger; +import io.druid.indexer.hadoop.MapWritable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.Writable; +import org.joda.time.DateTime; + +import java.sql.Timestamp; +import java.util.Date; +import java.util.List; +import java.util.Properties; + +/** + */ +public class HiveDruidSerDe extends AbstractSerDe +{ + private static final Logger logger = new Logger(HiveDruidSerDe.class); + + private String[] columns; + private ObjectInspector inspector; + + private int timeIndex = -1; + private PrimitiveObjectInspector.PrimitiveCategory timeConvert; + + @Override + public void initialize(Configuration configuration, Properties properties) throws SerDeException + { + LazySerDeParameters serdeParams = new LazySerDeParameters(configuration, properties, getClass().getName()); + + List columnNames = serdeParams.getColumnNames(); + List columnTypes = serdeParams.getColumnTypes(); + + List inspectors = Lists.newArrayListWithExpectedSize(columnNames.size()); + for (int i = 0; i < columnTypes.size(); ++i) { + PrimitiveTypeInfo typeInfo = (PrimitiveTypeInfo) columnTypes.get(i); + inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(typeInfo)); + if (typeInfo.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING + && ExpressionConverter.TIME_COLUMN_NAME.equals(columnNames.get(i))) { + timeConvert = typeInfo.getPrimitiveCategory(); + timeIndex = i; + } + } + if (timeConvert != null && + timeConvert != PrimitiveObjectInspector.PrimitiveCategory.LONG && + timeConvert != PrimitiveObjectInspector.PrimitiveCategory.DATE && + timeConvert != PrimitiveObjectInspector.PrimitiveCategory.TIMESTAMP) { + logger.warn("Not supported time conversion type " + timeConvert + ".. regarding to string"); + inspectors.set(timeIndex, PrimitiveObjectInspectorFactory.javaStringObjectInspector); + timeIndex = -1; + } + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); + columns = columnNames.toArray(new String[columnNames.size()]); + } + + @Override + public Class getSerializedClass() + { + throw new UnsupportedOperationException(); + } + + @Override + public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException + { + throw new UnsupportedOperationException(); + } + + @Override + public SerDeStats getSerDeStats() + { + return new SerDeStats(); + } + + @Override + public Object deserialize(Writable writable) throws SerDeException + { + MapWritable input = (MapWritable) writable; + List output = Lists.newArrayListWithExpectedSize(columns.length); + for (int i = 0; i < columns.length; i++) { + Object v = input.getValue().get(columns[i]); + if (v != null && i == timeIndex) { + long timeMillis = new DateTime(v).getMillis(); + switch (timeConvert) { + case LONG: + v = timeMillis; + break; + case DATE: + v = new Date(timeMillis); + break; + case TIMESTAMP: + v = new Timestamp(timeMillis); + break; + } + } + output.add(v); + } + return output; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException + { + return inspector; + } +} diff --git a/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java b/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java new file mode 100644 index 000000000000..907a0dae1891 --- /dev/null +++ b/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.UDFToDouble; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.junit.Test; + +import java.sql.Date; +import java.util.Arrays; + +public class ExpressionConverterTest +{ + @Test + public void test() + { + ExprNodeColumnDesc longTime = new ExprNodeColumnDesc(Long.class, "__time", "some_table", false); + ExprNodeColumnDesc dateTime = new ExprNodeColumnDesc(Date.class, "__time", "some_table", false); + + ExprNodeColumnDesc someColumn1 = new ExprNodeColumnDesc(String.class, "col1", "some_table", false); + ExprNodeColumnDesc someColumn2 = new ExprNodeColumnDesc(String.class, "col2", "some_table", false); + + GenericUDFBridge timeAsDouble = new GenericUDFBridge("double", false, UDFToDouble.class.getName()); + ExprNodeGenericFuncDesc gt = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPGreaterThan(), + Arrays.asList( + new ExprNodeGenericFuncDesc(PrimitiveObjectInspectorFactory.javaLongObjectInspector, + timeAsDouble, Arrays.asList(longTime)), + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 1031555555123L) + ) + ); + ExprNodeGenericFuncDesc lt = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPLessThan(), + Arrays.asList( + longTime, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 1231555555123L) + ) + ); + ExprNodeGenericFuncDesc noise1 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPLessThan(), + Arrays.asList( + someColumn1, + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value1") + ) + ); + ExprNodeGenericFuncDesc noise2 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPLessThan(), + Arrays.asList( + someColumn2, + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value2") + ) + ); + ExprNodeGenericFuncDesc range = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(lt, gt) + ); + + ExprNodeGenericFuncDesc between = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFBetween(), + Arrays.asList( + new ExprNodeConstantDesc(false), + dateTime, + new ExprNodeConstantDesc(TypeInfoFactory.dateTypeInfo, new Date(105, 3, 12)), + new ExprNodeConstantDesc(TypeInfoFactory.dateTypeInfo, new Date(115, 3, 14)) + ) + ); + + ExprNodeGenericFuncDesc complex1 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList( + new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(noise1, lt) + ), + between, + new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(gt, noise2) + ) + ) + ); + + + System.out.println("[ExpressionConverterTest/test] " + ExpressionConverter.getIntervals(gt)); + System.out.println("[ExpressionConverterTest/test] " + ExpressionConverter.getIntervals(lt)); + System.out.println("[ExpressionConverterTest/test] " + ExpressionConverter.getIntervals(range)); + System.out.println("[ExpressionConverterTest/test] " + ExpressionConverter.getIntervals(between)); + + System.out.println("[ExpressionConverterTest/test] " + ExpressionConverter.getIntervals(complex1)); + } +} \ No newline at end of file diff --git a/extensions-core/hive-extensions/src/test/java/io/druid/hive/HiveDruidInputFormatTest.java b/extensions-core/hive-extensions/src/test/java/io/druid/hive/HiveDruidInputFormatTest.java new file mode 100644 index 000000000000..e0c4cce8a12c --- /dev/null +++ b/extensions-core/hive-extensions/src/test/java/io/druid/hive/HiveDruidInputFormatTest.java @@ -0,0 +1,27 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import static org.junit.Assert.*; + +public class HiveDruidInputFormatTest +{ + +} \ No newline at end of file diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/MapWritable.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/MapWritable.java new file mode 100644 index 000000000000..ab4408156ebb --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/MapWritable.java @@ -0,0 +1,95 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.Maps; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + */ +public class MapWritable implements Writable +{ + private final Map value; + + public MapWritable() + { + value = Maps.newHashMap(); + } + + public MapWritable(Map value) + { + this.value = value; + } + + public Map getValue() + { + return value; + } + + public void update(Map newMap) + { + value.clear(); + value.putAll(newMap); + } + + @Override + public void write(DataOutput out) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public void readFields(DataInput in) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() + { + return Objects.hashCode(value); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + return Objects.equals(value, ((MapWritable) o).value); + + } + + @Override + public String toString() + { + return "MapWritable{value=" + value + '}'; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java new file mode 100644 index 000000000000..16fe06f6fc68 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java @@ -0,0 +1,553 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.common.Pair; +import com.metamx.common.StringUtils; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.collections.CountingMap; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.jackson.DruidDefaultSerializersModule; +import io.druid.query.Druids; +import io.druid.query.LocatedSegmentDescriptor; +import io.druid.query.Result; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.ExtractionDimensionSpec; +import io.druid.query.extraction.TimeFormatExtractionFn; +import io.druid.query.filter.DimFilter; +import io.druid.query.select.EventHolder; +import io.druid.query.select.PagingSpec; +import io.druid.query.select.SelectQuery; +import io.druid.query.select.SelectResultValue; +import io.druid.segment.column.Column; +import io.druid.server.coordination.DruidServerMetadata; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Interval; + +import javax.ws.rs.core.MediaType; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +public class QueryBasedInputFormat extends InputFormat + implements org.apache.hadoop.mapred.InputFormat +{ + protected static final Logger logger = new Logger(QueryBasedInputFormat.class); + + public static final String CONF_DRUID_BROKER_ADDRESS = "druid.broker.address"; + public static final String CONF_DRUID_DATASOURCE = "druid.datasource"; + public static final String CONF_DRUID_INTERVALS = "druid.intervals"; + public static final String CONF_DRUID_FILTERS = "druid.filters"; + + public static final String CONF_MAX_SPLIT_SIZE = "druid.max.split.size"; + public static final String CONF_SELECT_THRESHOLD = "druid.select.threshold"; + + public static final String CONF_SELECT_COLUMNS = "hive.io.file.readcolumn.names"; + + public static final int DEFAULT_SELECT_THRESHOLD = 10000; + + protected Configuration configure(Configuration configuration) + { + return configuration; + } + + @Override + public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits) throws IOException + { + return getInputSplits(job); + } + + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException + { + return Arrays.asList(getInputSplits(context.getConfiguration())); + } + + protected DruidInputSplit[] getInputSplits(Configuration conf) throws IOException + { + conf = configure(conf); + + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new DruidDefaultSerializersModule()); + + String brokerAddress = Preconditions.checkNotNull(conf.get(CONF_DRUID_BROKER_ADDRESS), "Missing broker address"); + String dataSource = Preconditions.checkNotNull(conf.get(CONF_DRUID_DATASOURCE), "Missing datasource name"); + String intervals = Preconditions.checkNotNull(conf.get(CONF_DRUID_INTERVALS), "Missing interval"); + + String requestURL = + String.format( + "%s/druid/v2/candidates?datasource=%s&intervals=%s", + brokerAddress, + dataSource, + URLEncoder.encode(intervals, StringUtils.UTF8_STRING) + ); + + + Lifecycle lifecycle = new Lifecycle(); + HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), lifecycle); + + StatusResponseHolder response; + try { + lifecycle.start(); + response = client.go( + new Request(HttpMethod.GET, new URL(requestURL)), + new StatusResponseHandler(Charsets.UTF_8) + ).get(); + } + catch (Exception e) { + throw new IOException(e instanceof ExecutionException ? e.getCause() : e); + } + finally { + lifecycle.stop(); + } + + final List segments = mapper.readValue( + response.getContent(), + new TypeReference>() + { + } + ); + if (segments == null || segments.size() == 0) { + throw new IllegalStateException("No segments found to read"); + } + + logger.info("segments to read [%s]", segments); + + long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0); + + if (maxSize > 0) { + Collections.shuffle(segments); + for (LocatedSegmentDescriptor segment : segments) { + maxSize = Math.max(maxSize, segment.getSize()); + } + } + + List splits = Lists.newArrayList(); + + List currentGroup = new ArrayList<>(); + long currentSize = 0; + + for (LocatedSegmentDescriptor segment : segments) { + if (maxSize > 0 && currentSize + segment.getSize() > maxSize) { + splits.add(toSplit(dataSource, currentGroup)); + currentGroup = Lists.newArrayList(); + currentSize = 0; + } + + currentGroup.add(segment); + currentSize += segment.getSize(); + } + + if (!currentGroup.isEmpty()) { + splits.add(toSplit(dataSource, currentGroup)); + } + + logger.info("Number of splits [%d]", splits.size()); + return splits.toArray(new DruidInputSplit[splits.size()]); + } + + @Override + public org.apache.hadoop.mapred.RecordReader getRecordReader( + org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter) throws IOException + { + DruidRecordReader reader = new DruidRecordReader(); + reader.initialize((DruidInputSplit) split, job); + return reader; + } + + @Override + public RecordReader createRecordReader( + InputSplit split, + TaskAttemptContext context + ) throws IOException, InterruptedException + { + return new DruidRecordReader(); + } + + private DruidInputSplit toSplit(String dataSource, List segments) + { + String[] locations = null; + try { + locations = getFrequentLocations(segments); + } + catch (Exception e) { + logger.error(e, "Exception thrown finding location of splits"); + } + long size = 0; + List intervals = Lists.newArrayList(); + for (LocatedSegmentDescriptor segment : segments) { + size += segment.getSize(); + intervals.add(segment.getInterval()); + } + return new DruidInputSplit(dataSource, locations, intervals, size); + } + + private String[] getFrequentLocations(List segments) throws IOException + { + List locations = Lists.newArrayList(); + for (LocatedSegmentDescriptor segment : segments) { + for (DruidServerMetadata location : segment.getLocations()) { + locations.add(location.getHost()); + } + } + return getMostFrequentLocations(locations); + } + + private static String[] getMostFrequentLocations(Iterable hosts) + { + final CountingMap counter = new CountingMap<>(); + for (String location : hosts) { + counter.add(location, 1); + } + + final TreeSet> sorted = Sets.>newTreeSet( + new Comparator>() + { + @Override + public int compare(Pair o1, Pair o2) + { + int compare = o2.lhs.compareTo(o1.lhs); // descending + if (compare == 0) { + compare = o1.rhs.compareTo(o2.rhs); // ascending + } + return compare; + } + } + ); + + for (Map.Entry entry : counter.entrySet()) { + sorted.add(Pair.of(entry.getValue().get(), entry.getKey())); + } + + // use default replication factor, if possible + final List locations = Lists.newArrayListWithCapacity(3); + for (Pair frequent : Iterables.limit(sorted, 3)) { + locations.add(frequent.rhs); + } + return locations.toArray(new String[locations.size()]); + } + + public static final class DruidInputSplit extends InputSplit implements org.apache.hadoop.mapred.InputSplit + { + private String dataSource; + private List intervals; + private String[] locations; + private long length; + + //required for deserialization + public DruidInputSplit() + { + } + + public DruidInputSplit(String dataSource, String[] locations, List intervals, long length) + { + this(); + this.dataSource = dataSource; + this.intervals = intervals; + this.locations = locations; + this.length = length; + } + + @Override + public long getLength() + { + return length; + } + + @Override + public String[] getLocations() + { + return locations; + } + + public String getDataSource() + { + return dataSource; + } + + public List getIntervals() + { + return intervals; + } + + @Override + public void write(DataOutput out) throws IOException + { + out.writeUTF(dataSource); + out.writeInt(intervals.size()); + for (String interval : Lists.transform(intervals, Functions.toStringFunction())) { + out.writeUTF(interval); + } + out.writeInt(locations.length); + for (String location : locations) { + out.writeUTF(location); + } + out.writeLong(length); + } + + @Override + public void readFields(DataInput in) throws IOException + { + dataSource = in.readUTF(); + intervals = Lists.newArrayList(); + for (int i = in.readInt(); i > 0; i--) { + intervals.add(new Interval(in.readUTF())); + } + locations = new String[in.readInt()]; + for (int i = 0; i < locations.length; i++) { + locations[i] = in.readUTF(); + } + length = in.readLong(); + } + + @Override + public String toString() + { + return "DruidInputSplit{" + + "dataSource=" + dataSource + + "intervals=" + intervals + + ", locations=" + Arrays.toString(locations) + + '}'; + } + } + + public static class DruidRecordReader extends RecordReader + implements org.apache.hadoop.mapred.RecordReader + { + private final Lifecycle lifecycle = new Lifecycle(); + + private int threshold; + private ObjectMapper mapper; + private HttpClient client; + private Druids.SelectQueryBuilder builder; + private Request request; + + private boolean finished; + private Iterator events = Iterators.emptyIterator(); + private Map paging = null; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException + { + initialize((DruidInputSplit) split, context.getConfiguration()); + } + + public void initialize(DruidInputSplit split, Configuration configuration) throws IOException + { + logger.info("Start loading " + split); + + String location = split.getLocations()[0]; + String dataSource = split.getDataSource(); + List intervals = split.getIntervals(); + + client = HttpClientInit.createClient(HttpClientConfig.builder().build(), lifecycle); + + mapper = new DefaultObjectMapper(); + threshold = configuration.getInt(CONF_SELECT_THRESHOLD, DEFAULT_SELECT_THRESHOLD); + + builder = new Druids.SelectQueryBuilder() + .dataSource(dataSource) + .intervals(intervals) + .granularity(QueryGranularity.ALL); + + List dimensionSpecs = Lists.newArrayList(); + for (String column : configuration.get(CONF_SELECT_COLUMNS).split(",")) { + column = column.trim(); + if (column.equals(Column.TIME_COLUMN_NAME)) { + dimensionSpecs.add( + new ExtractionDimensionSpec( + column, + column, + new TimeFormatExtractionFn("yyyy-MM-dd'T'HH:mm:ss'Z'", null, null) + ) + ); + } else { + dimensionSpecs.add(new DefaultDimensionSpec(column, column)); + } + } + + builder.dimensionSpecs(dimensionSpecs); + + String filters = configuration.get(CONF_DRUID_FILTERS); + if (filters != null && !filters.isEmpty()) { + builder.filters(mapper.readValue(filters, DimFilter.class)); + } + + request = new Request( + HttpMethod.POST, + new URL(String.format("%s/druid/v2", "http://" + location)) + ); + try { + lifecycle.start(); + } + catch (Exception e) { + throw new IOException(e); + } + } + + private void nextPage() throws IOException, InterruptedException + { + PagingSpec pagingSpec = new PagingSpec(paging, threshold, true); + SelectQuery query = builder.pagingSpec(pagingSpec).build(); + + StatusResponseHolder response; + try { + response = client.go( + request.setContent(mapper.writeValueAsBytes(query)) + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + MediaType.APPLICATION_JSON + ), + new StatusResponseHandler(Charsets.UTF_8) + ).get(); + } + catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + + HttpResponseStatus status = response.getStatus(); + if (!status.equals(HttpResponseStatus.OK)) { + throw new RuntimeException(response.getContent()); + } + + List> value = mapper.readValue( + response.getContent(), + new TypeReference>>() + { + } + ); + if (!value.isEmpty()) { + SelectResultValue result = value.get(0).getValue(); + events = result.iterator(); + paging = result.getPagingIdentifiers(); + } else { + events = Iterators.emptyIterator(); + finished = true; + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException + { + if (!finished && !events.hasNext()) { + nextPage(); + } + return events.hasNext(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException + { + return NullWritable.get(); + } + + @Override + public MapWritable getCurrentValue() throws IOException, InterruptedException + { + return new MapWritable(events.next().getEvent()); + } + + @Override + public float getProgress() throws IOException + { + return finished ? 1 : 0; + } + + @Override + public NullWritable createKey() + { + return NullWritable.get(); + } + + @Override + public MapWritable createValue() + { + return new MapWritable(); + } + + @Override + public boolean next(NullWritable key, MapWritable value) throws IOException + { + try { + if (nextKeyValue()) { + value.update(events.next().getEvent()); + return true; + } + } + catch (InterruptedException e) { + // ignore + } + return false; + } + + @Override + public long getPos() throws IOException + { + return 0; + } + + @Override + public void close() throws IOException + { + lifecycle.stop(); + } + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/QueryBasedInputFormatTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/QueryBasedInputFormatTest.java new file mode 100644 index 000000000000..9f36162d976b --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/QueryBasedInputFormatTest.java @@ -0,0 +1,73 @@ +package io.druid.indexer.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + */ +public class QueryBasedInputFormatTest +{ + @Test + public void test() throws IOException, InterruptedException + { + Configuration conf = new Configuration(); + conf.set(QueryBasedInputFormat.CONF_DRUID_BROKER_ADDRESS, "http://localhost:8082"); + conf.set(QueryBasedInputFormat.CONF_DRUID_DATASOURCE, "wikipedia"); + conf.set(QueryBasedInputFormat.CONF_DRUID_INTERVALS, "2010-12-02T03:00:00.000Z/2015-12-02T04:00:00.000Z"); + conf.set(QueryBasedInputFormat.CONF_SELECT_COLUMNS, "__time, page, language, count, added, delta, deleted"); + + JobContext context = EasyMock.createMock(JobContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(conf); + EasyMock.replay(context); + + TaskAttemptContext attemptContext = EasyMock.createMock(TaskAttemptContext.class); + EasyMock.expect(attemptContext.getConfiguration()).andReturn(conf); + EasyMock.replay(attemptContext); + + QueryBasedInputFormat format = new QueryBasedInputFormat(); + Assert.assertTrue(format instanceof InputFormat); + Assert.assertTrue(format instanceof org.apache.hadoop.mapred.InputFormat); + for (InputSplit split : format.getSplits(context)) { + QueryBasedInputFormat.DruidInputSplit dsplit = (QueryBasedInputFormat.DruidInputSplit) split; + + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + dsplit.write(new DataOutputStream(bout)); + + dsplit = new QueryBasedInputFormat.DruidInputSplit(); + dsplit.readFields(new DataInputStream(new ByteArrayInputStream(bout.toByteArray()))); + System.out.println("[DruidInputFormatTest/test] " + split); + RecordReader reader = format.createRecordReader(dsplit, null); + reader.initialize(split, attemptContext); + + while (reader.nextKeyValue()) { + System.out.println("[DruidInputFormatTest/test] " + reader.getCurrentValue()); + } + reader.close(); + + org.apache.hadoop.mapred.RecordReader reader2 = + format.getRecordReader(dsplit, new JobConf(conf), null); + + NullWritable key = reader2.createKey(); + MapWritable value = reader2.createValue(); + while (reader2.next(key, value)) { + System.out.println("[DruidInputFormatTest/test] " + value); + } + reader2.close(); + } + } +} diff --git a/pom.xml b/pom.xml index e011e0762ce6..23a557c6b641 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ extensions-core/postgresql-metadata-storage extensions-core/namespace-lookup extensions-core/s3-extensions + extensions-core/hive-extensions extensions-contrib/azure-extensions extensions-contrib/cassandra-storage From bb9378d5ea8d2df3c983a7abc1053b4b9cc2dcce Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Sat, 23 Apr 2016 09:19:29 +0900 Subject: [PATCH 5/7] support filter pushdown on dimensions --- extensions-core/hive-extensions/pom.xml | 6 + .../io/druid/hive/ExpressionConverter.java | 342 ++++++++++++++---- .../io/druid/hive/HiveDruidInputFormat.java | 27 +- .../src/main/java/io/druid/hive/Ranges.java | 122 +++++++ .../druid/hive/ExpressionConverterTest.java | 246 +++++++++++-- .../indexer/hadoop/QueryBasedInputFormat.java | 74 ++-- 6 files changed, 681 insertions(+), 136 deletions(-) create mode 100644 extensions-core/hive-extensions/src/main/java/io/druid/hive/Ranges.java diff --git a/extensions-core/hive-extensions/pom.xml b/extensions-core/hive-extensions/pom.xml index e348b7d9f883..6ebb8628ded2 100644 --- a/extensions-core/hive-extensions/pom.xml +++ b/extensions-core/hive-extensions/pom.xml @@ -44,6 +44,12 @@ ${project.parent.version} provided + + io.druid + druid-processing + ${project.parent.version} + provided + io.druid druid-indexing-hadoop diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java index e67a34655e72..8206cba86d16 100644 --- a/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java @@ -19,9 +19,21 @@ package io.druid.hive; +import com.google.common.base.Function; +import com.google.common.collect.BoundType; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; import com.metamx.common.logger.Logger; import io.druid.common.utils.JodaUtils; +import io.druid.query.filter.BoundDimFilter; +import io.druid.query.filter.DimFilter; +import io.druid.query.filter.InDimFilter; +import io.druid.query.filter.OrDimFilter; +import io.druid.query.filter.SelectorDimFilter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; @@ -30,14 +42,23 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.joda.time.Interval; +import java.sql.Timestamp; import java.text.DateFormat; import java.text.ParseException; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; /** */ @@ -47,19 +68,96 @@ public class ExpressionConverter public static final String TIME_COLUMN_NAME = "__time"; - static List convert(Configuration jobConf) + static Map> convert(Configuration configuration) { - String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + String filterExprSerialized = configuration.get(TableScanDesc.FILTER_EXPR_CONF_STR); if (filterExprSerialized == null) { logger.info("No predicate is pushed down"); - return Collections.emptyList(); + return Collections.emptyMap(); } - List intervals = getIntervals(SerializationUtilities.deserializeExpression(filterExprSerialized)); - logger.info("Extracted intervals : " + intervals); + ExprNodeGenericFuncDesc expr = SerializationUtilities.deserializeExpression(filterExprSerialized); + return getRanges(expr, getColumnTypes(configuration)); + } + + public static List toInterval(List ranges) + { + List intervals = Lists.transform( + ranges, new Function() + { + @Override + public Interval apply(Range range) + { + long start = range.hasLowerBound() ? toLong(range.lowerEndpoint()) : JodaUtils.MIN_INSTANT; + long end = range.hasUpperBound() ? toLong(range.upperEndpoint()) : JodaUtils.MAX_INSTANT; + if (range.hasLowerBound() && range.lowerBoundType() == BoundType.OPEN) { + start++; + } + if (range.hasUpperBound() && range.upperBoundType() == BoundType.CLOSED) { + end--; + } + return new Interval(start, end); + } + } + ); + logger.info("Converted time ranges %s to interval %s", ranges, intervals); return intervals; } - static List getIntervals(ExprNodeGenericFuncDesc filterExpr) + // should be string type + public static DimFilter toFilter(String dimension, List ranges) + { + Iterable filtered = Iterables.filter(ranges, Ranges.VALID); + List equalValues = Lists.newArrayList(); + List dimFilters = Lists.newArrayList(); + for (Range range : filtered) { + String lower = range.hasLowerBound() ? (String) range.lowerEndpoint() : null; + String upper = range.hasUpperBound() ? (String) range.upperEndpoint() : null; + if (lower == null && upper == null) { + return null; + } + if (Objects.equals(lower, upper)) { + equalValues.add(lower); + continue; + } + boolean lowerStrict = range.hasLowerBound() && range.lowerBoundType() == BoundType.OPEN; + boolean upperStrict = range.hasUpperBound() && range.upperBoundType() == BoundType.OPEN; + dimFilters.add(new BoundDimFilter(dimension, lower, upper, lowerStrict, upperStrict, false, null)); + } + if (equalValues.size() > 1) { + dimFilters.add(new InDimFilter(dimension, equalValues, null)); + } else if (equalValues.size() == 1) { + dimFilters.add(new SelectorDimFilter(dimension, equalValues.get(0), null)); + } + DimFilter dimFilter = new OrDimFilter(dimFilters).optimize(); + logger.info("Converted dimension '%s' ranges %s to filter %s", dimension, ranges, dimFilter); + return dimFilter; + } + + private static Map getColumnTypes(Configuration configuration) + { + String[] colNames = configuration.getStrings(serdeConstants.LIST_COLUMNS); + String[] colTypes = configuration.getStrings(serdeConstants.LIST_COLUMN_TYPES); + Set projections = Sets.newHashSet(ColumnProjectionUtils.getReadColumnIDs(configuration)); + if (colNames == null || colTypes == null) { + return ImmutableMap.of(); + } + Map typeMap = Maps.newHashMap(); + for (int i = 0; i < colTypes.length; i++) { + if (!projections.isEmpty() && !projections.contains(i)) { + continue; + } + String colName = colNames[i].trim(); + PrimitiveTypeInfo typeInfo = TypeInfoFactory.getPrimitiveTypeInfo(colTypes[i]); + if (colName.equals(ExpressionConverter.TIME_COLUMN_NAME) && + typeInfo.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG) { + logger.warn("time column should be defined as bigint type, yet"); + } + typeMap.put(colName, typeInfo); + } + return typeMap; + } + + static Map> getRanges(ExprNodeGenericFuncDesc filterExpr, Map types) { logger.info("Start analyzing predicate " + filterExpr.getExprString()); SearchArgument searchArgument = ConvertAstToSearchArg.create(filterExpr); @@ -67,123 +165,181 @@ static List getIntervals(ExprNodeGenericFuncDesc filterExpr) List leaves = Lists.newArrayList(searchArgument.getLeaves()); - List currents = null; + Map> rangeMap = Maps.newHashMap(); if (root.getOperator() == ExpressionTree.Operator.AND) { for (ExpressionTree child : root.getChildren()) { - String extracted = extractSoleColumn(child, leaves, null); - if (TIME_COLUMN_NAME.equals(extracted)) { - List intervals = extractIntervals(child, leaves, false); - if (currents == null) { - currents = intervals; - continue; - } - // (a or b) and (c or d) -> (a and c) or (b and c) or (a and d) or (b and d) - List overlapped = Lists.newArrayList(); - for (Interval current : currents) { - for (Interval interval : intervals) { - Interval overlap = current.overlap(interval); - if (overlap != null) { - overlapped.add(overlap); - } + String extracted = extractSoleColumn(child, leaves); + if (extracted == null) { + continue; + } + PrimitiveTypeInfo type = types.get(extracted); + List ranges = extractRanges(type, child, leaves, false); + if (ranges == null) { + continue; + } + if (rangeMap.get(extracted) == null) { + rangeMap.put(extracted, ranges); + continue; + } + // (a or b) and (c or d) -> (a and c) or (b and c) or (a and d) or (b and d) + List overlapped = Lists.newArrayList(); + for (Range current : rangeMap.get(extracted)) { + for (Range interval : ranges) { + if (current.isConnected(interval)) { + overlapped.add(current.intersection(interval)); } } - currents = overlapped; + rangeMap.put(extracted, overlapped); } } } else { - String extracted = extractSoleColumn(root, leaves, null); - if (TIME_COLUMN_NAME.equals(extracted)) { - currents = extractIntervals(root, leaves, false); + String extracted = extractSoleColumn(root, leaves); + if (extracted != null) { + PrimitiveTypeInfo type = types.get(extracted); + List ranges = extractRanges(type, root, leaves, false); + if (ranges != null) { + rangeMap.put(extracted, ranges); + } } } - return currents != null ? JodaUtils.condenseIntervals(currents) : Collections.emptyList(); + + return Maps.transformValues(rangeMap, Ranges.COMPACT); } - private static String extractSoleColumn(ExpressionTree tree, List leaves, String current) + private static String extractSoleColumn(ExpressionTree tree, List leaves) { if (tree.getOperator() == ExpressionTree.Operator.LEAF) { - String columnName = leaves.get(tree.getLeaf()).getColumnName(); - if (current == null || current.equals(columnName)) { - return columnName; - } - return null; + return leaves.get(tree.getLeaf()).getColumnName(); } + String current = null; List children = tree.getChildren(); if (children != null && !children.isEmpty()) { for (ExpressionTree child : children) { - current = extractSoleColumn(child, leaves, current); + String resolved = extractSoleColumn(child, leaves); + if (current != null && !current.equals(resolved)) { + return null; + } + current = resolved; } } return current; } - private static List extractIntervals(ExpressionTree tree, List leaves, boolean withNot) + private static List extractRanges( + PrimitiveTypeInfo type, + ExpressionTree tree, + List leaves, + boolean withNot + ) { if (tree.getOperator() == ExpressionTree.Operator.NOT) { - return extractIntervals(tree.getChildren().get(0), leaves, !withNot); + return extractRanges(type, tree.getChildren().get(0), leaves, !withNot); } if (tree.getOperator() == ExpressionTree.Operator.LEAF) { - return leafToIntervals(leaves.get(tree.getLeaf()), withNot); + return leafToRanges(type, leaves.get(tree.getLeaf()), withNot); } if (tree.getOperator() == ExpressionTree.Operator.OR) { - List intervals = Lists.newArrayList(); + List intervals = Lists.newArrayList(); for (ExpressionTree child : tree.getChildren()) { - List extracted = extractIntervals(child, leaves, withNot); + List extracted = extractRanges(type, child, leaves, withNot); if (extracted != null) { intervals.addAll(extracted); } } + return intervals; } return null; } - private static List leafToIntervals(PredicateLeaf hiveLeaf, boolean withNot) + private static List leafToRanges(PrimitiveTypeInfo type, PredicateLeaf hiveLeaf, boolean withNot) { - switch (hiveLeaf.getOperator()) { + PredicateLeaf.Operator operator = hiveLeaf.getOperator(); + switch (operator) { case LESS_THAN: case LESS_THAN_EQUALS: - Long time = literalToTime(hiveLeaf.getLiteral()); - if (time != null) { - return Arrays.asList( - withNot ? new Interval(time, JodaUtils.MAX_INSTANT) : new Interval(JodaUtils.MIN_INSTANT, time) - ); + case EQUALS: // in druid, all equals are null-safe equals + case NULL_SAFE_EQUALS: + Comparable value = literalToType(hiveLeaf.getLiteral(), type); + if (value == null) { + return null; + } + if (operator == PredicateLeaf.Operator.LESS_THAN) { + return Arrays.asList(withNot ? Range.atLeast(value) : Range.lessThan(value)); + } else if (operator == PredicateLeaf.Operator.LESS_THAN_EQUALS) { + return Arrays.asList(withNot ? Range.greaterThan(value) : Range.atMost(value)); + } else { + if (!withNot) { + return Arrays.asList(Range.closed(value, value)); + } + return Arrays.asList(Range.lessThan(value), Range.greaterThan(value)); } - return null; case BETWEEN: - Long start = literalToTime(hiveLeaf.getLiteralList().get(0)); - Long end = literalToTime(hiveLeaf.getLiteralList().get(1)); - if (start != null && end != null) { - return withNot ? Arrays.asList( - new Interval(JodaUtils.MIN_INSTANT, start), - new Interval(end, JodaUtils.MAX_INSTANT) - ) : Arrays.asList(new Interval(start, end)); + Comparable value1 = literalToType(hiveLeaf.getLiteralList().get(0), type); + Comparable value2 = literalToType(hiveLeaf.getLiteralList().get(1), type); + if (value1 == null || value2 == null) { + return null; } - return null; + boolean inverted = value1.compareTo(value2) > 0; + if (!withNot) { + return Arrays.asList(inverted ? Range.closed(value2, value1) : Range.closed(value1, value2)); + } + return Arrays.asList( + Range.lessThan(inverted ? value2 : value1), + Range.greaterThan(inverted ? value1 : value2) + ); + case IN: + List ranges = Lists.newArrayList(); + for (Object literal : hiveLeaf.getLiteralList()) { + Comparable element = literalToType(literal, type); + if (element == null) { + return null; + } + if (withNot) { + ranges.addAll(Arrays.asList(Range.lessThan(element), Range.greaterThan(element))); + } else { + ranges.add(Range.closed(element, element)); + } + } + return ranges; } - if (hiveLeaf.getOperator() == PredicateLeaf.Operator.EQUALS && hiveLeaf.getLiteral() instanceof String) { - try { - Interval interval = new Interval(hiveLeaf.getLiteral()); - return withNot ? Arrays.asList( - new Interval(JodaUtils.MIN_INSTANT, interval.getStartMillis()), - new Interval(interval.getEndMillis(), JodaUtils.MAX_INSTANT) - ) : Arrays.asList(interval); - } - catch (IllegalArgumentException e) { - // best effort. ignore - } + return null; + } + + private static Comparable literalToType(Object literal, PrimitiveTypeInfo type) + { + switch (type.getPrimitiveCategory()) { + case LONG: + return toLong(literal); + case INT: + return toInt(literal); + case FLOAT: + return toFloat(literal); + case DOUBLE: + return toDouble(literal); + case STRING: + return String.valueOf(literal); } return null; } - private static Long literalToTime(Object literal) + private static Long toLong(Object literal) { - if (literal instanceof Long) { - return (Long) literal; + if (literal instanceof Number) { + return ((Number) literal).longValue(); } if (literal instanceof Date) { return ((Date) literal).getTime(); } + if (literal instanceof Timestamp) { + return ((Timestamp) literal).getTime(); + } if (literal instanceof String) { + try { + return Long.valueOf((String) literal); + } + catch (NumberFormatException e) { + // ignore + } try { return DateFormat.getDateInstance().parse((String) literal).getTime(); } @@ -193,4 +349,52 @@ private static Long literalToTime(Object literal) } return null; } + + private static Integer toInt(Object literal) + { + if (literal instanceof Number) { + return ((Number) literal).intValue(); + } + if (literal instanceof String) { + try { + return Integer.valueOf((String) literal); + } + catch (NumberFormatException e) { + // ignore + } + } + return null; + } + + private static Float toFloat(Object literal) + { + if (literal instanceof Number) { + return ((Number) literal).floatValue(); + } + if (literal instanceof String) { + try { + return Float.valueOf((String) literal); + } + catch (NumberFormatException e) { + // ignore + } + } + return null; + } + + private static Double toDouble(Object literal) + { + if (literal instanceof Number) { + return ((Number) literal).doubleValue(); + } + if (literal instanceof String) { + try { + return Double.valueOf((String) literal); + } + catch (NumberFormatException e) { + // ignore + } + } + return null; + } } diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java index 2427171847c0..15cd0442f7d1 100644 --- a/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java @@ -19,9 +19,13 @@ package io.druid.hive; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Functions; import com.google.common.collect.Lists; +import com.google.common.collect.Range; import io.druid.indexer.hadoop.QueryBasedInputFormat; +import io.druid.query.filter.AndDimFilter; +import io.druid.query.filter.DimFilter; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -35,12 +39,12 @@ import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; -import org.joda.time.Interval; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Properties; public class HiveDruidInputFormat extends QueryBasedInputFormat implements HiveOutputFormat @@ -72,16 +76,29 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter repo } @Override - protected final Configuration configure(Configuration configuration) + protected final Configuration configure(Configuration configuration, ObjectMapper mapper) + throws IOException { - List intervals = ExpressionConverter.convert(configuration); - if (intervals == null || intervals.isEmpty()) { + Map> converted = ExpressionConverter.convert(configuration); + List timeRanges = converted.remove(ExpressionConverter.TIME_COLUMN_NAME); + if (timeRanges == null || timeRanges.isEmpty()) { throw new IllegalArgumentException("failed to extract intervals from predicate"); } configuration.set( CONF_DRUID_INTERVALS, - StringUtils.join(Lists.transform(intervals, Functions.toStringFunction()), ",") + StringUtils.join(Lists.transform(ExpressionConverter.toInterval(timeRanges), Functions.toStringFunction()), ",") ); + + List filters = Lists.newArrayList(); + for (Map.Entry> entry : converted.entrySet()) { + DimFilter filter = ExpressionConverter.toFilter(entry.getKey(), entry.getValue()); + if (filter != null) { + filters.add(filter); + } + } + if (!filters.isEmpty()) { + configuration.set(CONF_DRUID_FILTERS, mapper.writeValueAsString(new AndDimFilter(filters).optimize())); + } return configuration; } diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/Ranges.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/Ranges.java new file mode 100644 index 000000000000..777b25da56ad --- /dev/null +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/Ranges.java @@ -0,0 +1,122 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.BoundType; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.TreeSet; + +/** + */ +public class Ranges +{ + public static final Predicate VALID = new Predicate() + { + @Override + public boolean apply(Range input) + { + return !input.isEmpty(); + } + }; + + public static final Function, List> COMPACT = new Function, List>() + { + @Override + public List apply(List input) + { + return Ranges.condenseRanges(input); + } + }; + + public static List condenseRanges(List ranges) + { + if (ranges.size() <= 1) { + return ranges; + } + + Comparator startThenEnd = new Comparator() + { + @Override + public int compare(Range lhs, Range rhs) + { + int compare = 0; + if (lhs.hasLowerBound() && rhs.hasLowerBound()) { + compare = lhs.lowerEndpoint().compareTo(rhs.lowerEndpoint()); + } else if (!lhs.hasLowerBound() && rhs.hasLowerBound()) { + compare = -1; + } else if (lhs.hasLowerBound() && !rhs.hasLowerBound()) { + compare = 1; + } + if (compare != 0) { + return compare; + } + if (lhs.hasUpperBound() && rhs.hasUpperBound()) { + compare = lhs.upperEndpoint().compareTo(rhs.upperEndpoint()); + } else if (!lhs.hasUpperBound() && rhs.hasUpperBound()) { + compare = -1; + } else if (lhs.hasUpperBound() && !rhs.hasUpperBound()) { + compare = 1; + } + return compare; + } + }; + + TreeSet sortedIntervals = Sets.newTreeSet(startThenEnd); + sortedIntervals.addAll(ranges); + + List retVal = Lists.newArrayList(); + + Iterator intervalsIter = sortedIntervals.iterator(); + Range currInterval = intervalsIter.next(); + while (intervalsIter.hasNext()) { + Range next = intervalsIter.next(); + if (currInterval.encloses(next)) { + continue; + } + if (mergeable(currInterval, next)) { + currInterval = currInterval.span(next); + } else { + retVal.add(currInterval); + currInterval = next; + } + } + retVal.add(currInterval); + + return retVal; + } + + public static boolean mergeable(Range range1, Range range2) + { + Comparable x1 = range1.upperEndpoint(); + Comparable x2 = range2.lowerEndpoint(); + int compare = x1.compareTo(x2); + return compare > 0 || (compare == 0 + && range1.upperBoundType() == BoundType.CLOSED + && range2.lowerBoundType() == BoundType.CLOSED); + } +} diff --git a/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java b/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java index 907a0dae1891..3b7aa86c6d4e 100644 --- a/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java +++ b/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java @@ -19,109 +19,285 @@ package io.druid.hive; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Functions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.filter.DimFilter; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; -import org.apache.hadoop.hive.ql.udf.UDFToDouble; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; import org.junit.Test; -import java.sql.Date; import java.util.Arrays; +import java.util.List; +import java.util.Map; public class ExpressionConverterTest { + private final ObjectMapper mapper = new DefaultObjectMapper(); + @Test - public void test() + public void test() throws Exception { + Map types = Maps.newHashMap(); + types.put("__time", TypeInfoFactory.longTypeInfo); + types.put("col1", TypeInfoFactory.stringTypeInfo); + types.put("col2", TypeInfoFactory.stringTypeInfo); + ExprNodeColumnDesc longTime = new ExprNodeColumnDesc(Long.class, "__time", "some_table", false); - ExprNodeColumnDesc dateTime = new ExprNodeColumnDesc(Date.class, "__time", "some_table", false); ExprNodeColumnDesc someColumn1 = new ExprNodeColumnDesc(String.class, "col1", "some_table", false); ExprNodeColumnDesc someColumn2 = new ExprNodeColumnDesc(String.class, "col2", "some_table", false); - GenericUDFBridge timeAsDouble = new GenericUDFBridge("double", false, UDFToDouble.class.getName()); - ExprNodeGenericFuncDesc gt = new ExprNodeGenericFuncDesc( + // cannot do this +// ExprNodeGenericFuncDesc timeCastToDouble = new ExprNodeGenericFuncDesc( +// PrimitiveObjectInspectorFactory.javaLongObjectInspector, +// new GenericUDFBridge("double", false, UDFToDouble.class.getName()), Arrays.asList(longTime) +// ); + + ExprNodeGenericFuncDesc noise1 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPNotEqual(), + Arrays.asList( + someColumn1, + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value1") + ) + ); + ExprNodeGenericFuncDesc noise2 = new ExprNodeGenericFuncDesc( PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, new GenericUDFOPGreaterThan(), Arrays.asList( - new ExprNodeGenericFuncDesc(PrimitiveObjectInspectorFactory.javaLongObjectInspector, - timeAsDouble, Arrays.asList(longTime)), - new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 1031555555123L) + someColumn2, + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value2") ) ); - ExprNodeGenericFuncDesc lt = new ExprNodeGenericFuncDesc( + ExprNodeGenericFuncDesc noise3 = new ExprNodeGenericFuncDesc( PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, - new GenericUDFOPLessThan(), + new GenericUDFIn(), Arrays.asList( - longTime, - new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 1231555555123L) + someColumn2, + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value1"), + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value2") ) ); - ExprNodeGenericFuncDesc noise1 = new ExprNodeGenericFuncDesc( + ExprNodeGenericFuncDesc gt = new ExprNodeGenericFuncDesc( PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, - new GenericUDFOPLessThan(), + new GenericUDFOPGreaterThan(), Arrays.asList( - someColumn1, - new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value1") + longTime, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2010, 1, 1, 0, 0).getMillis()) ) ); - ExprNodeGenericFuncDesc noise2 = new ExprNodeGenericFuncDesc( + ExprNodeGenericFuncDesc lt = new ExprNodeGenericFuncDesc( PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, new GenericUDFOPLessThan(), Arrays.asList( - someColumn2, - new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value2") + longTime, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2012, 3, 1, 0, 0).getMillis()) ) ); - ExprNodeGenericFuncDesc range = new ExprNodeGenericFuncDesc( + ExprNodeGenericFuncDesc all = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPOr(), + Arrays.asList(lt, gt) + ); + ExprNodeGenericFuncDesc mixed1 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(noise3, lt) + ); + ExprNodeGenericFuncDesc between0 = new ExprNodeGenericFuncDesc( PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, new GenericUDFOPAnd(), Arrays.asList(lt, gt) ); - ExprNodeGenericFuncDesc between = new ExprNodeGenericFuncDesc( + ExprNodeGenericFuncDesc between1 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFBetween(), + Arrays.asList( + new ExprNodeConstantDesc(false), + longTime, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2011, 6, 1, 10, 0).getMillis()), + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2016, 4, 1, 12, 0).getMillis()) + ) + ); + ExprNodeGenericFuncDesc between2 = new ExprNodeGenericFuncDesc( PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, new GenericUDFBetween(), Arrays.asList( new ExprNodeConstantDesc(false), - dateTime, - new ExprNodeConstantDesc(TypeInfoFactory.dateTypeInfo, new Date(105, 3, 12)), - new ExprNodeConstantDesc(TypeInfoFactory.dateTypeInfo, new Date(115, 3, 14)) + longTime, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2016, 4, 1, 12, 0).getMillis()), + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2017, 1, 1, 12, 10).getMillis()) ) ); - ExprNodeGenericFuncDesc complex1 = new ExprNodeGenericFuncDesc( + ExprNodeGenericFuncDesc intersectAnd = new ExprNodeGenericFuncDesc( PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, new GenericUDFOPAnd(), + Arrays.asList(between0, between1) + ); + ExprNodeGenericFuncDesc intersectOr = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPOr(), + Arrays.asList(between0, between1) + ); + + ExprNodeGenericFuncDesc abutAnd = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(between1, between2) + ); + ExprNodeGenericFuncDesc abutOr = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPOr(), + Arrays.asList(between1, between2) + ); + + ExprNodeGenericFuncDesc split = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPOr(), + Arrays.asList(between0, between2) + ); + + ExprNodeGenericFuncDesc complex1 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPOr(), Arrays.asList( new ExprNodeGenericFuncDesc( PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, new GenericUDFOPAnd(), Arrays.asList(noise1, lt) ), - between, new ExprNodeGenericFuncDesc( PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, new GenericUDFOPAnd(), - Arrays.asList(gt, noise2) + Arrays.asList( + between1, + new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(noise2, gt) + ) + ) ) ) ); + validate( + all, + types, + Arrays.asList("(-∞‥+∞)"), + Arrays.asList("-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z") + ); + validate( + mixed1, + types, + Arrays.asList("(-∞‥1330560000000)"), + Arrays.asList("-146136543-09-08T08:23:32.096Z/2012-03-01T00:00:00.000Z") + ); + validate( + between0, + types, + Arrays.asList("(1262304000000‥1330560000000)"), + Arrays.asList("2010-01-01T00:00:00.000Z/2012-03-01T00:00:00.000Z") + ); + validate( + between1, + types, + Arrays.asList("[1306922400000‥1459512000000]"), + Arrays.asList("2011-06-01T10:00:00.000Z/2016-04-01T12:00:00.000Z") + ); + validate( + between2, + types, + Arrays.asList("[1459512000000‥1483272600000]"), + Arrays.asList("2016-04-01T12:00:00.000Z/2017-01-01T12:10:00.000Z") + ); - System.out.println("[ExpressionConverterTest/test] " + ExpressionConverter.getIntervals(gt)); - System.out.println("[ExpressionConverterTest/test] " + ExpressionConverter.getIntervals(lt)); - System.out.println("[ExpressionConverterTest/test] " + ExpressionConverter.getIntervals(range)); - System.out.println("[ExpressionConverterTest/test] " + ExpressionConverter.getIntervals(between)); + // 0 AND 1 + validate( + intersectAnd, + types, + Arrays.asList("[1306922400000‥1330560000000)"), + Arrays.asList("2011-06-01T10:00:00.000Z/2012-03-01T00:00:00.000Z") + ); + // 0 OR 1 + validate( + intersectOr, + types, + Arrays.asList("(1262304000000‥1459512000000]"), + Arrays.asList("2010-01-01T00:00:00.000Z/2016-04-01T12:00:00.000Z") + ); + // 1 AND 2 + validate( + abutAnd, + types, + Arrays.asList("[1459512000000‥1459512000000]"), + Arrays.asList("2016-04-01T12:00:00.000Z/2016-04-01T12:00:00.000Z") + ); + // 1 OR 2 + validate( + abutOr, + types, + Arrays.asList("[1306922400000‥1483272600000]"), + Arrays.asList("2011-06-01T10:00:00.000Z/2017-01-01T12:10:00.000Z") + ); + // 0 OR 2 + validate( + split, + types, + Arrays.asList("(1262304000000‥1330560000000)", "[1459512000000‥1483272600000]"), + Arrays.asList( + "2010-01-01T00:00:00.000Z/2012-03-01T00:00:00.000Z", + "2016-04-01T12:00:00.000Z/2017-01-01T12:10:00.000Z" + ) + ); - System.out.println("[ExpressionConverterTest/test] " + ExpressionConverter.getIntervals(complex1)); + validate( + complex1, + types, + Arrays.asList("(-∞‥1459512000000]"), + Arrays.asList("-146136543-09-08T08:23:32.096Z/2016-04-01T12:00:00.000Z") + ); + } + + private void validate( + ExprNodeGenericFuncDesc predicate, + Map types, + List expected1, + List expected2 + ) throws Exception + { + Map> converted = ExpressionConverter.getRanges(predicate, types); + List ranges = converted.remove(ExpressionConverter.TIME_COLUMN_NAME); + List intervals = ExpressionConverter.toInterval(ranges); + Assert.assertEquals(expected1, Lists.transform(ranges, Functions.toStringFunction())); + Assert.assertEquals(expected2, Lists.transform(intervals, Functions.toStringFunction())); + for (Map.Entry> entry : converted.entrySet()) { + System.out.println( + mapper.readValue( + mapper.writeValueAsString(ExpressionConverter.toFilter(entry.getKey(), entry.getValue())), DimFilter.class + ) + ); + } } -} \ No newline at end of file +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java index 16fe06f6fc68..ad0d281419a1 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java @@ -24,6 +24,7 @@ import com.google.common.base.Charsets; import com.google.common.base.Functions; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -103,8 +104,9 @@ public class QueryBasedInputFormat extends InputFormat getSplits(JobContext context) throws IOException, Interr protected DruidInputSplit[] getInputSplits(Configuration conf) throws IOException { - conf = configure(conf); - ObjectMapper mapper = new ObjectMapper(); mapper.registerModule(new DruidDefaultSerializersModule()); + conf = configure(conf, mapper); + String brokerAddress = Preconditions.checkNotNull(conf.get(CONF_DRUID_BROKER_ADDRESS), "Missing broker address"); String dataSource = Preconditions.checkNotNull(conf.get(CONF_DRUID_DATASOURCE), "Missing datasource name"); String intervals = Preconditions.checkNotNull(conf.get(CONF_DRUID_INTERVALS), "Missing interval"); + String filters = conf.get(CONF_DRUID_FILTERS); String requestURL = String.format( @@ -171,7 +174,7 @@ protected DruidInputSplit[] getInputSplits(Configuration conf) throws IOExceptio logger.info("segments to read [%s]", segments); - long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0); + long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE); if (maxSize > 0) { Collections.shuffle(segments); @@ -186,9 +189,9 @@ protected DruidInputSplit[] getInputSplits(Configuration conf) throws IOExceptio long currentSize = 0; for (LocatedSegmentDescriptor segment : segments) { - if (maxSize > 0 && currentSize + segment.getSize() > maxSize) { - splits.add(toSplit(dataSource, currentGroup)); - currentGroup = Lists.newArrayList(); + if (maxSize < 0 || maxSize > 0 && currentSize + segment.getSize() > maxSize) { + splits.add(toSplit(dataSource, filters, currentGroup)); + currentGroup.clear(); currentSize = 0; } @@ -197,7 +200,7 @@ protected DruidInputSplit[] getInputSplits(Configuration conf) throws IOExceptio } if (!currentGroup.isEmpty()) { - splits.add(toSplit(dataSource, currentGroup)); + splits.add(toSplit(dataSource, filters, currentGroup)); } logger.info("Number of splits [%d]", splits.size()); @@ -206,7 +209,8 @@ protected DruidInputSplit[] getInputSplits(Configuration conf) throws IOExceptio @Override public org.apache.hadoop.mapred.RecordReader getRecordReader( - org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter) throws IOException + org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter + ) throws IOException { DruidRecordReader reader = new DruidRecordReader(); reader.initialize((DruidInputSplit) split, job); @@ -222,25 +226,19 @@ public RecordReader createRecordReader( return new DruidRecordReader(); } - private DruidInputSplit toSplit(String dataSource, List segments) + private DruidInputSplit toSplit(String dataSource, String filters, List segments) { - String[] locations = null; - try { - locations = getFrequentLocations(segments); - } - catch (Exception e) { - logger.error(e, "Exception thrown finding location of splits"); - } long size = 0; List intervals = Lists.newArrayList(); for (LocatedSegmentDescriptor segment : segments) { size += segment.getSize(); intervals.add(segment.getInterval()); } - return new DruidInputSplit(dataSource, locations, intervals, size); + String[] locations = getFrequentLocations(segments); + return new DruidInputSplit(dataSource, intervals, filters, locations, size); } - private String[] getFrequentLocations(List segments) throws IOException + private String[] getFrequentLocations(List segments) { List locations = Lists.newArrayList(); for (LocatedSegmentDescriptor segment : segments) { @@ -288,6 +286,7 @@ public int compare(Pair o1, Pair o2) public static final class DruidInputSplit extends InputSplit implements org.apache.hadoop.mapred.InputSplit { private String dataSource; + private String filters; private List intervals; private String[] locations; private long length; @@ -297,11 +296,17 @@ public DruidInputSplit() { } - public DruidInputSplit(String dataSource, String[] locations, List intervals, long length) + public DruidInputSplit( + String dataSource, + List intervals, + String filters, + String[] locations, + long length + ) { - this(); this.dataSource = dataSource; this.intervals = intervals; + this.filters = filters; this.locations = locations; this.length = length; } @@ -323,6 +328,11 @@ public String getDataSource() return dataSource; } + public String getFilters() + { + return filters; + } + public List getIntervals() { return intervals; @@ -336,6 +346,7 @@ public void write(DataOutput out) throws IOException for (String interval : Lists.transform(intervals, Functions.toStringFunction())) { out.writeUTF(interval); } + out.writeUTF(Strings.nullToEmpty(filters)); out.writeInt(locations.length); for (String location : locations) { out.writeUTF(location); @@ -351,6 +362,7 @@ public void readFields(DataInput in) throws IOException for (int i = in.readInt(); i > 0; i--) { intervals.add(new Interval(in.readUTF())); } + filters = in.readUTF(); locations = new String[in.readInt()]; for (int i = 0; i < locations.length; i++) { locations[i] = in.readUTF(); @@ -363,7 +375,8 @@ public String toString() { return "DruidInputSplit{" + "dataSource=" + dataSource + - "intervals=" + intervals + + ", intervals=" + intervals + + ", filters=" + filters + ", locations=" + Arrays.toString(locations) + '}'; } @@ -426,7 +439,7 @@ public void initialize(DruidInputSplit split, Configuration configuration) throw builder.dimensionSpecs(dimensionSpecs); - String filters = configuration.get(CONF_DRUID_FILTERS); + String filters = split.getFilters(); if (filters != null && !filters.isEmpty()) { builder.filters(mapper.readValue(filters, DimFilter.class)); } @@ -441,17 +454,18 @@ public void initialize(DruidInputSplit split, Configuration configuration) throw catch (Exception e) { throw new IOException(e); } + + if (logger.isInfoEnabled()) { + logger.info("Retrieving from druid using query.. " + nextQuery()); + } } private void nextPage() throws IOException, InterruptedException { - PagingSpec pagingSpec = new PagingSpec(paging, threshold, true); - SelectQuery query = builder.pagingSpec(pagingSpec).build(); - StatusResponseHolder response; try { response = client.go( - request.setContent(mapper.writeValueAsBytes(query)) + request.setContent(mapper.writeValueAsBytes(nextQuery())) .setHeader( HttpHeaders.Names.CONTENT_TYPE, MediaType.APPLICATION_JSON @@ -484,6 +498,12 @@ private void nextPage() throws IOException, InterruptedException } } + private SelectQuery nextQuery() + { + PagingSpec pagingSpec = new PagingSpec(paging, threshold, true); + return builder.pagingSpec(pagingSpec).build(); + } + @Override public boolean nextKeyValue() throws IOException, InterruptedException { From ade75cfdc254f08e64c53149aa0835e2e80e5b3a Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Sat, 23 Apr 2016 19:29:40 +0900 Subject: [PATCH 6/7] Add storage handler --- ...tFormat.java => DruidHiveInputFormat.java} | 2 +- ...iveDruidSerDe.java => DruidHiveSerDe.java} | 4 +- .../druid/hive/DruidHiveStorageHandler.java} | 27 ++++++++++++-- .../io/druid/hive/ExpressionConverter.java | 37 ++++++++++++++++++- .../druid/hive/ExpressionConverterTest.java | 24 ++++++------ .../indexer/hadoop/QueryBasedInputFormat.java | 6 ++- 6 files changed, 80 insertions(+), 20 deletions(-) rename extensions-core/hive-extensions/src/main/java/io/druid/hive/{HiveDruidInputFormat.java => DruidHiveInputFormat.java} (98%) rename extensions-core/hive-extensions/src/main/java/io/druid/hive/{HiveDruidSerDe.java => DruidHiveSerDe.java} (97%) rename extensions-core/hive-extensions/src/{test/java/io/druid/hive/HiveDruidInputFormatTest.java => main/java/io/druid/hive/DruidHiveStorageHandler.java} (57%) diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveInputFormat.java similarity index 98% rename from extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java rename to extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveInputFormat.java index 15cd0442f7d1..4e1c26576884 100644 --- a/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidInputFormat.java +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveInputFormat.java @@ -47,7 +47,7 @@ import java.util.Map; import java.util.Properties; -public class HiveDruidInputFormat extends QueryBasedInputFormat implements HiveOutputFormat +public class DruidHiveInputFormat extends QueryBasedInputFormat implements HiveOutputFormat { @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidSerDe.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveSerDe.java similarity index 97% rename from extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidSerDe.java rename to extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveSerDe.java index c09763ab3686..52b40375c2e7 100644 --- a/extensions-core/hive-extensions/src/main/java/io/druid/hive/HiveDruidSerDe.java +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveSerDe.java @@ -43,9 +43,9 @@ /** */ -public class HiveDruidSerDe extends AbstractSerDe +public class DruidHiveSerDe extends AbstractSerDe { - private static final Logger logger = new Logger(HiveDruidSerDe.class); + private static final Logger logger = new Logger(DruidHiveSerDe.class); private String[] columns; private ObjectInspector inspector; diff --git a/extensions-core/hive-extensions/src/test/java/io/druid/hive/HiveDruidInputFormatTest.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveStorageHandler.java similarity index 57% rename from extensions-core/hive-extensions/src/test/java/io/druid/hive/HiveDruidInputFormatTest.java rename to extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveStorageHandler.java index e0c4cce8a12c..c929eff5f134 100644 --- a/extensions-core/hive-extensions/src/test/java/io/druid/hive/HiveDruidInputFormatTest.java +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveStorageHandler.java @@ -19,9 +19,30 @@ package io.druid.hive; -import static org.junit.Assert.*; +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; -public class HiveDruidInputFormatTest +/** + */ +public class DruidHiveStorageHandler extends DefaultStorageHandler { + @Override + public Class getInputFormatClass() + { + return DruidHiveInputFormat.class; + } + + @Override + public Class getOutputFormatClass() + { + return DruidHiveInputFormat.class; + } -} \ No newline at end of file + @Override + public Class getSerDeClass() + { + return DruidHiveSerDe.class; + } +} diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java index 8206cba86d16..c21e3aa5887d 100644 --- a/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java @@ -34,6 +34,7 @@ import io.druid.query.filter.InDimFilter; import io.druid.query.filter.OrDimFilter; import io.druid.query.filter.SelectorDimFilter; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; @@ -93,7 +94,7 @@ public Interval apply(Range range) start++; } if (range.hasUpperBound() && range.upperBoundType() == BoundType.CLOSED) { - end--; + end++; } return new Interval(start, end); } @@ -203,7 +204,11 @@ static Map> getRanges(ExprNodeGenericFuncDesc filterExpr, Ma } } - return Maps.transformValues(rangeMap, Ranges.COMPACT); + Map> rangesMap = Maps.transformValues(rangeMap, Ranges.COMPACT); + for (Map.Entry> entry : rangesMap.entrySet()) { + logger.info(">> " + entry); + } + return rangesMap; } private static String extractSoleColumn(ExpressionTree tree, List leaves) @@ -318,6 +323,34 @@ private static Comparable literalToType(Object literal, PrimitiveTypeInfo type) return toDouble(literal); case STRING: return String.valueOf(literal); + case TIMESTAMP: + return toTimestamp(literal); + } + return null; + } + + private static Comparable toTimestamp(Object literal) + { + if (literal instanceof Timestamp) { + return (Timestamp)literal; + } + if (literal instanceof Date) { + return new Timestamp(((Date) literal).getTime()); + } + if (literal instanceof Number) { + return new Timestamp(((Number) literal).longValue()); + } + if (literal instanceof String) { + String string = (String) literal; + if (StringUtils.isNumeric(string)) { + return new Timestamp(Long.valueOf(string)); + } + try { + return Timestamp.valueOf(string); + } + catch (NumberFormatException e) { + // ignore + } } return null; } diff --git a/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java b/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java index 3b7aa86c6d4e..48f548c2b9dd 100644 --- a/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java +++ b/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java @@ -45,6 +45,7 @@ import org.junit.Assert; import org.junit.Test; +import java.sql.Timestamp; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -62,6 +63,7 @@ public void test() throws Exception types.put("col2", TypeInfoFactory.stringTypeInfo); ExprNodeColumnDesc longTime = new ExprNodeColumnDesc(Long.class, "__time", "some_table", false); + ExprNodeColumnDesc timestampTime = new ExprNodeColumnDesc(Timestamp.class, "__time", "some_table", false); ExprNodeColumnDesc someColumn1 = new ExprNodeColumnDesc(String.class, "col1", "some_table", false); ExprNodeColumnDesc someColumn2 = new ExprNodeColumnDesc(String.class, "col2", "some_table", false); @@ -101,8 +103,8 @@ public void test() throws Exception PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, new GenericUDFOPGreaterThan(), Arrays.asList( - longTime, - new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2010, 1, 1, 0, 0).getMillis()) + timestampTime, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new Timestamp(new DateTime(2010, 1, 1, 0, 0).getMillis())) ) ); ExprNodeGenericFuncDesc lt = new ExprNodeGenericFuncDesc( @@ -218,19 +220,19 @@ public void test() throws Exception between0, types, Arrays.asList("(1262304000000‥1330560000000)"), - Arrays.asList("2010-01-01T00:00:00.000Z/2012-03-01T00:00:00.000Z") + Arrays.asList("2010-01-01T00:00:00.001Z/2012-03-01T00:00:00.000Z") ); validate( between1, types, Arrays.asList("[1306922400000‥1459512000000]"), - Arrays.asList("2011-06-01T10:00:00.000Z/2016-04-01T12:00:00.000Z") + Arrays.asList("2011-06-01T10:00:00.000Z/2016-04-01T12:00:00.001Z") ); validate( between2, types, Arrays.asList("[1459512000000‥1483272600000]"), - Arrays.asList("2016-04-01T12:00:00.000Z/2017-01-01T12:10:00.000Z") + Arrays.asList("2016-04-01T12:00:00.000Z/2017-01-01T12:10:00.001Z") ); // 0 AND 1 @@ -245,21 +247,21 @@ public void test() throws Exception intersectOr, types, Arrays.asList("(1262304000000‥1459512000000]"), - Arrays.asList("2010-01-01T00:00:00.000Z/2016-04-01T12:00:00.000Z") + Arrays.asList("2010-01-01T00:00:00.001Z/2016-04-01T12:00:00.001Z") ); // 1 AND 2 validate( abutAnd, types, Arrays.asList("[1459512000000‥1459512000000]"), - Arrays.asList("2016-04-01T12:00:00.000Z/2016-04-01T12:00:00.000Z") + Arrays.asList("2016-04-01T12:00:00.000Z/2016-04-01T12:00:00.001Z") ); // 1 OR 2 validate( abutOr, types, Arrays.asList("[1306922400000‥1483272600000]"), - Arrays.asList("2011-06-01T10:00:00.000Z/2017-01-01T12:10:00.000Z") + Arrays.asList("2011-06-01T10:00:00.000Z/2017-01-01T12:10:00.001Z") ); // 0 OR 2 validate( @@ -267,8 +269,8 @@ public void test() throws Exception types, Arrays.asList("(1262304000000‥1330560000000)", "[1459512000000‥1483272600000]"), Arrays.asList( - "2010-01-01T00:00:00.000Z/2012-03-01T00:00:00.000Z", - "2016-04-01T12:00:00.000Z/2017-01-01T12:10:00.000Z" + "2010-01-01T00:00:00.001Z/2012-03-01T00:00:00.000Z", + "2016-04-01T12:00:00.000Z/2017-01-01T12:10:00.001Z" ) ); @@ -276,7 +278,7 @@ public void test() throws Exception complex1, types, Arrays.asList("(-∞‥1459512000000]"), - Arrays.asList("-146136543-09-08T08:23:32.096Z/2016-04-01T12:00:00.000Z") + Arrays.asList("-146136543-09-08T08:23:32.096Z/2016-04-01T12:00:00.001Z") ); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java index ad0d281419a1..bd3414009d5b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java @@ -189,7 +189,11 @@ protected DruidInputSplit[] getInputSplits(Configuration conf) throws IOExceptio long currentSize = 0; for (LocatedSegmentDescriptor segment : segments) { - if (maxSize < 0 || maxSize > 0 && currentSize + segment.getSize() > maxSize) { + if (maxSize < 0) { + splits.add(toSplit(dataSource, filters, Arrays.asList(segment))); + continue; + } + if (maxSize > 0 && currentSize + segment.getSize() > maxSize) { splits.add(toSplit(dataSource, filters, currentGroup)); currentGroup.clear(); currentSize = 0; From 25cc89b1b822862f7f8a2297b5077e7975b3a57b Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Sun, 24 Apr 2016 16:57:47 +0900 Subject: [PATCH 7/7] fix log --- .../io/druid/hive/DruidHiveInputFormat.java | 13 +++- .../io/druid/hive/ExpressionConverter.java | 12 +-- .../hadoop/QueryBasedInputFormatTest.java | 73 ------------------- 3 files changed, 15 insertions(+), 83 deletions(-) delete mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/hadoop/QueryBasedInputFormatTest.java diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveInputFormat.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveInputFormat.java index 4e1c26576884..16644680ebc3 100644 --- a/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveInputFormat.java +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveInputFormat.java @@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -79,7 +81,8 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter repo protected final Configuration configure(Configuration configuration, ObjectMapper mapper) throws IOException { - Map> converted = ExpressionConverter.convert(configuration); + Map types = ExpressionConverter.getColumnTypes(configuration); + Map> converted = ExpressionConverter.convert(configuration, types); List timeRanges = converted.remove(ExpressionConverter.TIME_COLUMN_NAME); if (timeRanges == null || timeRanges.isEmpty()) { throw new IllegalArgumentException("failed to extract intervals from predicate"); @@ -91,9 +94,11 @@ protected final Configuration configure(Configuration configuration, ObjectMappe List filters = Lists.newArrayList(); for (Map.Entry> entry : converted.entrySet()) { - DimFilter filter = ExpressionConverter.toFilter(entry.getKey(), entry.getValue()); - if (filter != null) { - filters.add(filter); + if (types.get(entry.getKey()).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) { + DimFilter filter = ExpressionConverter.toFilter(entry.getKey(), entry.getValue()); + if (filter != null) { + filters.add(filter); + } } } if (!filters.isEmpty()) { diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java index c21e3aa5887d..b1885a4a867c 100644 --- a/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java @@ -69,15 +69,14 @@ public class ExpressionConverter public static final String TIME_COLUMN_NAME = "__time"; - static Map> convert(Configuration configuration) + static Map> convert(Configuration configuration, Map types) { String filterExprSerialized = configuration.get(TableScanDesc.FILTER_EXPR_CONF_STR); if (filterExprSerialized == null) { logger.info("No predicate is pushed down"); return Collections.emptyMap(); } - ExprNodeGenericFuncDesc expr = SerializationUtilities.deserializeExpression(filterExprSerialized); - return getRanges(expr, getColumnTypes(configuration)); + return getRanges(SerializationUtilities.deserializeExpression(filterExprSerialized), types); } public static List toInterval(List ranges) @@ -134,7 +133,7 @@ public static DimFilter toFilter(String dimension, List ranges) return dimFilter; } - private static Map getColumnTypes(Configuration configuration) + public static Map getColumnTypes(Configuration configuration) { String[] colNames = configuration.getStrings(serdeConstants.LIST_COLUMNS); String[] colTypes = configuration.getStrings(serdeConstants.LIST_COLUMN_TYPES); @@ -150,8 +149,9 @@ private static Map getColumnTypes(Configuration confi String colName = colNames[i].trim(); PrimitiveTypeInfo typeInfo = TypeInfoFactory.getPrimitiveTypeInfo(colTypes[i]); if (colName.equals(ExpressionConverter.TIME_COLUMN_NAME) && - typeInfo.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG) { - logger.warn("time column should be defined as bigint type, yet"); + typeInfo.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG && + typeInfo.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.TIMESTAMP) { + logger.warn("time column should be defined as bigint or timestamp type"); } typeMap.put(colName, typeInfo); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/QueryBasedInputFormatTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/QueryBasedInputFormatTest.java deleted file mode 100644 index 9f36162d976b..000000000000 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/QueryBasedInputFormatTest.java +++ /dev/null @@ -1,73 +0,0 @@ -package io.druid.indexer.hadoop; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.easymock.EasyMock; -import org.junit.Assert; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -/** - */ -public class QueryBasedInputFormatTest -{ - @Test - public void test() throws IOException, InterruptedException - { - Configuration conf = new Configuration(); - conf.set(QueryBasedInputFormat.CONF_DRUID_BROKER_ADDRESS, "http://localhost:8082"); - conf.set(QueryBasedInputFormat.CONF_DRUID_DATASOURCE, "wikipedia"); - conf.set(QueryBasedInputFormat.CONF_DRUID_INTERVALS, "2010-12-02T03:00:00.000Z/2015-12-02T04:00:00.000Z"); - conf.set(QueryBasedInputFormat.CONF_SELECT_COLUMNS, "__time, page, language, count, added, delta, deleted"); - - JobContext context = EasyMock.createMock(JobContext.class); - EasyMock.expect(context.getConfiguration()).andReturn(conf); - EasyMock.replay(context); - - TaskAttemptContext attemptContext = EasyMock.createMock(TaskAttemptContext.class); - EasyMock.expect(attemptContext.getConfiguration()).andReturn(conf); - EasyMock.replay(attemptContext); - - QueryBasedInputFormat format = new QueryBasedInputFormat(); - Assert.assertTrue(format instanceof InputFormat); - Assert.assertTrue(format instanceof org.apache.hadoop.mapred.InputFormat); - for (InputSplit split : format.getSplits(context)) { - QueryBasedInputFormat.DruidInputSplit dsplit = (QueryBasedInputFormat.DruidInputSplit) split; - - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - dsplit.write(new DataOutputStream(bout)); - - dsplit = new QueryBasedInputFormat.DruidInputSplit(); - dsplit.readFields(new DataInputStream(new ByteArrayInputStream(bout.toByteArray()))); - System.out.println("[DruidInputFormatTest/test] " + split); - RecordReader reader = format.createRecordReader(dsplit, null); - reader.initialize(split, attemptContext); - - while (reader.nextKeyValue()) { - System.out.println("[DruidInputFormatTest/test] " + reader.getCurrentValue()); - } - reader.close(); - - org.apache.hadoop.mapred.RecordReader reader2 = - format.getRecordReader(dsplit, new JobConf(conf), null); - - NullWritable key = reader2.createKey(); - MapWritable value = reader2.createValue(); - while (reader2.next(key, value)) { - System.out.println("[DruidInputFormatTest/test] " + value); - } - reader2.close(); - } - } -}