Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.common.utils;

import com.google.common.collect.ImmutableMap;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.Map;

public class ServletResourceUtils
{
/**
* Sanitize the exception as a map of "error" to information about the exception.
*
* This method explicitly suppresses the stack trace and any other logging. Any logging should be handled by the caller.
* @param t The exception to sanitize
* @return An immutable Map with a single entry which maps "error" to information about the error suitable for passing as an entity in a servlet error response.
*/
public static @NotNull Map<String, String> sanitizeException(@Nullable Throwable t)
{
return ImmutableMap.of(
"error",
t == null ? "null" : (t.getMessage() == null ? t.toString() : t.getMessage())
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.common.utils;

import org.junit.Assert;
import org.junit.Test;

public class ServletResourceUtilsTest
{

@Test
public void testSanitizeException() throws Exception
{
final String message = "some message";
Assert.assertEquals(message, ServletResourceUtils.sanitizeException(new Throwable(message)).get("error"));
Assert.assertEquals("null", ServletResourceUtils.sanitizeException(null).get("error"));
Assert.assertEquals(message, ServletResourceUtils.sanitizeException(new Throwable()
{
@Override
public String toString()
{
return message;
}
}).get("error"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public String getConnectorPath()
return (null == connectorPath) ? defaultPath("connector") : connectorPath;
}

protected String defaultPath(final String subPath)
public String defaultPath(final String subPath)
{
return ZKPaths.makePath(getBase(), subPath);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.listener.announcer;

import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.concurrent.Execs;
import io.druid.server.DruidNode;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ListenerDiscoverer
{
private static final Logger LOG = new Logger(ListenerDiscoverer.class);
private final ConcurrentMap<String, ServiceCache<Void>> services = new ConcurrentHashMap<>();
private final ServiceDiscovery<Void> serviceDiscovery;
private final Object startStopSync = new Object();
private volatile boolean started = false;

@Inject
public ListenerDiscoverer(
CuratorFramework cf,
ListeningAnnouncerConfig listeningAnnouncerConfig
)
{
this(
ServiceDiscoveryBuilder
.builder(Void.class)
.basePath(listeningAnnouncerConfig.getListenersPath())
.client(cf)
.watchInstances(false)
.build()
);
}

// Exposed for unit tests
ListenerDiscoverer(
ServiceDiscovery<Void> serviceDiscovery
)
{
this.serviceDiscovery = serviceDiscovery;
}

@LifecycleStart
public void start()
{
synchronized (startStopSync) {
if (started) {
LOG.debug("Already started");
return;
}
try {
serviceDiscovery.start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
started = true;
}
}

@LifecycleStop
public void stop()
{
synchronized (startStopSync) {
if (!started) {
LOG.debug("Already stopped");
return;
}
final Closer closer = Closer.create();
closer.register(serviceDiscovery);
for (ServiceCache<Void> serviceCache : services.values()) {
closer.register(serviceCache);
}
try {
closer.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
services.clear();
started = false;
}
}

/**
* Get nodes at a particular listener.
* This method lazily adds service discovery
*
* @param listener_key The Listener's service key
*
* @return A collection of druid nodes as established by the service discovery
*/
public Collection<DruidNode> getNodes(final String listener_key)
{
ServiceCache<Void> serviceCache = services.get(listener_key);
if (serviceCache == null) {
synchronized (startStopSync) {
if (!started) {
throw new ISE("ListenerDiscoverer not started");
}
serviceCache = serviceDiscovery
.serviceCacheBuilder()
.name(listener_key)
.threadFactory(Execs.makeThreadFactory("ListenerDiscoverer--%s"))
.build();
try {
serviceCache.start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
if (services.putIfAbsent(listener_key, serviceCache) != null) {
try {
serviceCache.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
serviceCache = services.get(listener_key);
if (serviceCache == null) {
throw new ISE("Race condition on listener key [%s]. Should not happen!", listener_key);
}
}
}
}
return ImmutableList.copyOf(Collections2.filter(
Lists.transform(
serviceCache.getInstances(),
new Function<ServiceInstance<Void>, DruidNode>()
{
@Nullable
@Override
public DruidNode apply(@Nullable ServiceInstance<Void> input)
{
if (input == null) {
LOG.debug("Instance for listener group [%s] was null", listener_key);
return null;
}
return new DruidNode(input.getName(), input.getAddress(), input.getPort());
}
}
), new Predicate<DruidNode>()
{
@Override
public boolean apply(@Nullable DruidNode input)
{
return input != null;
}
}
));
}
}
Loading