Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.druid.guice.annotations.Smile;
import io.druid.query.DataSourceUtil;
import io.druid.query.Query;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.router.QueryHostFinder;
import org.jboss.netty.buffer.ChannelBuffer;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
private static final Joiner COMMA_JOIN = Joiner.on(",");

private final ServerConfig config;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QueryHostFinder hostFinder;
Expand All @@ -74,6 +76,7 @@ public class AsyncQueryForwardingServlet extends HttpServlet
private final RequestLogger requestLogger;

public AsyncQueryForwardingServlet(
ServerConfig config,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QueryHostFinder hostFinder,
Expand All @@ -82,6 +85,7 @@ public AsyncQueryForwardingServlet(
RequestLogger requestLogger
)
{
this.config = config;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.hostFinder = hostFinder;
Expand All @@ -95,6 +99,8 @@ protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException
{
final AsyncContext asyncContext = req.startAsync(req, res);
// default async timeout to be same as maxIdleTime for now
asyncContext.setTimeout(config.getMaxIdleTime().toStandardDuration().getMillis());
asyncContext.start(
new Runnable()
{
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/io/druid/server/QueryResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import org.joda.time.DateTime;

Expand Down Expand Up @@ -70,6 +71,7 @@ public class QueryResource
public static final String APPLICATION_SMILE = "application/smile";
public static final String APPLICATION_JSON = "application/json";

private final ServerConfig config;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final QuerySegmentWalker texasRanger;
Expand All @@ -79,6 +81,7 @@ public class QueryResource

@Inject
public QueryResource(
ServerConfig config,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
QuerySegmentWalker texasRanger,
Expand All @@ -87,6 +90,7 @@ public QueryResource(
QueryManager queryManager
)
{
this.config = config;
this.jsonMapper = jsonMapper.copy();
this.jsonMapper.getFactory().configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);

Expand Down Expand Up @@ -135,6 +139,15 @@ public Response doPost(
queryId = UUID.randomUUID().toString();
query = query.withId(queryId);
}
if (query.getContextValue("timeout") == null) {
query = query.withOverriddenContext(
ImmutableMap.of(
"timeout",
config.getMaxIdleTime().toStandardDuration().getMillis()
)
);
}


if (log.isDebugEnabled()) {
log.debug("Got query [%s]", query);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.server.router;

import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.Query;

/**
*/
public class PriorityTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
{
@Override
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{
final int priority = query.getContextPriority(0);

if (priority < 0) {
return Optional.of(
Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
)
);
}

return Optional.absent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public class TieredBrokerConfig
@NotNull
private Period pollPeriod = new Period("PT1M");

@JsonProperty
@NotNull
private String strategies = "[{\"type\":\"timeBoundary\"},{\"type\":\"priority\"}]";

// tier, <bard, numThreads>
public LinkedHashMap<String, String> getTierToBrokerMap()
{
Expand Down Expand Up @@ -88,4 +92,9 @@ public Period getPollPeriod()
{
return pollPeriod;
}

public String getStrategies()
{
return strategies;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.server.router;

import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
Expand All @@ -30,7 +31,6 @@
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.query.Query;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
Expand All @@ -50,6 +50,7 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
private final TieredBrokerConfig tierConfig;
private final ServerDiscoveryFactory serverDiscoveryFactory;
private final ConcurrentHashMap<String, ServerDiscoverySelector> selectorMap = new ConcurrentHashMap<>();
private final List<TieredBrokerSelectorStrategy> strategies;

private final Object lock = new Object();

Expand All @@ -59,12 +60,14 @@ public class TieredBrokerHostSelector<T> implements HostSelector<T>
public TieredBrokerHostSelector(
CoordinatorRuleManager ruleManager,
TieredBrokerConfig tierConfig,
ServerDiscoveryFactory serverDiscoveryFactory
ServerDiscoveryFactory serverDiscoveryFactory,
List<TieredBrokerSelectorStrategy> strategies
)
{
this.ruleManager = ruleManager;
this.tierConfig = tierConfig;
this.serverDiscoveryFactory = serverDiscoveryFactory;
this.strategies = strategies;
}

@LifecycleStart
Expand Down Expand Up @@ -128,12 +131,12 @@ public Pair<String, ServerDiscoverySelector> select(final Query<T> query)

String brokerServiceName = null;

// Somewhat janky way of always selecting highest priority broker for this type of query
if (query instanceof TimeBoundaryQuery) {
brokerServiceName = Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
);
for (TieredBrokerSelectorStrategy strategy : strategies) {
final Optional<String> optionalName = strategy.getBrokerServiceName(tierConfig, query);
if (optionalName.isPresent()) {
brokerServiceName = optionalName.get();
break;
}
}

if (brokerServiceName == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.server.router;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Lists;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.google.inject.Provider;

import java.util.List;

/**
*/
public class TieredBrokerSelectorStrategiesProvider implements Provider<List<TieredBrokerSelectorStrategy>>
{
private final List<TieredBrokerSelectorStrategy> strategies = Lists.newArrayList();

@Inject
public TieredBrokerSelectorStrategiesProvider(ObjectMapper jsonMapper, TieredBrokerConfig config)
{
try {
this.strategies.addAll(
(List<TieredBrokerSelectorStrategy>) jsonMapper.readValue(
config.getStrategies(), new TypeReference<List<TieredBrokerSelectorStrategy>>()
{
}
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

@Override
public List<TieredBrokerSelectorStrategy> get()
{
return strategies;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.server.router;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import io.druid.query.Query;

/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "timeBoundary", value = TimeBoundaryTieredBrokerSelectorStrategy.class),
@JsonSubTypes.Type(name = "priority", value = PriorityTieredBrokerSelectorStrategy.class)
})

public interface TieredBrokerSelectorStrategy
{
public Optional<String> getBrokerServiceName(TieredBrokerConfig config, Query query);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.server.router;

import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import io.druid.query.Query;
import io.druid.query.timeboundary.TimeBoundaryQuery;

/**
*/
public class TimeBoundaryTieredBrokerSelectorStrategy implements TieredBrokerSelectorStrategy
{
@Override
public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, Query query)
{
// Somewhat janky way of always selecting highest priority broker for this type of query
if (query instanceof TimeBoundaryQuery) {
return Optional.of(
Iterables.getFirst(
tierConfig.getTierToBrokerMap().values(),
tierConfig.getDefaultBrokerServiceName()
)
);
}

return Optional.absent();
}
}
Loading