diff --git a/common/src/main/java/io/druid/common/utils/ServletResourceUtils.java b/common/src/main/java/io/druid/common/utils/ServletResourceUtils.java new file mode 100644 index 000000000000..b005853d2869 --- /dev/null +++ b/common/src/main/java/io/druid/common/utils/ServletResourceUtils.java @@ -0,0 +1,44 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.common.utils; + +import com.google.common.collect.ImmutableMap; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import java.util.Map; + +public class ServletResourceUtils +{ + /** + * Sanitize the exception as a map of "error" to information about the exception. + * + * This method explicitly suppresses the stack trace and any other logging. Any logging should be handled by the caller. + * @param t The exception to sanitize + * @return An immutable Map with a single entry which maps "error" to information about the error suitable for passing as an entity in a servlet error response. + */ + public static @NotNull Map sanitizeException(@Nullable Throwable t) + { + return ImmutableMap.of( + "error", + t == null ? "null" : (t.getMessage() == null ? t.toString() : t.getMessage()) + ); + } +} diff --git a/common/src/test/java/io/druid/common/utils/ServletResourceUtilsTest.java b/common/src/test/java/io/druid/common/utils/ServletResourceUtilsTest.java new file mode 100644 index 000000000000..f97ab079fc7c --- /dev/null +++ b/common/src/test/java/io/druid/common/utils/ServletResourceUtilsTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.common.utils; + +import org.junit.Assert; +import org.junit.Test; + +public class ServletResourceUtilsTest +{ + + @Test + public void testSanitizeException() throws Exception + { + final String message = "some message"; + Assert.assertEquals(message, ServletResourceUtils.sanitizeException(new Throwable(message)).get("error")); + Assert.assertEquals("null", ServletResourceUtils.sanitizeException(null).get("error")); + Assert.assertEquals(message, ServletResourceUtils.sanitizeException(new Throwable() + { + @Override + public String toString() + { + return message; + } + }).get("error")); + } +} diff --git a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java index ed6bf8505546..3335eb820add 100644 --- a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java @@ -90,7 +90,7 @@ public String getConnectorPath() return (null == connectorPath) ? defaultPath("connector") : connectorPath; } - protected String defaultPath(final String subPath) + public String defaultPath(final String subPath) { return ZKPaths.makePath(getBase(), subPath); } diff --git a/server/src/main/java/io/druid/server/listener/announcer/ListenerDiscoverer.java b/server/src/main/java/io/druid/server/listener/announcer/ListenerDiscoverer.java new file mode 100644 index 000000000000..9e7c35a4b510 --- /dev/null +++ b/server/src/main/java/io/druid/server/listener/announcer/ListenerDiscoverer.java @@ -0,0 +1,189 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.announcer; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.io.Closer; +import com.google.inject.Inject; +import com.metamx.common.ISE; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import io.druid.concurrent.Execs; +import io.druid.server.DruidNode; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.discovery.ServiceCache; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class ListenerDiscoverer +{ + private static final Logger LOG = new Logger(ListenerDiscoverer.class); + private final ConcurrentMap> services = new ConcurrentHashMap<>(); + private final ServiceDiscovery serviceDiscovery; + private final Object startStopSync = new Object(); + private volatile boolean started = false; + + @Inject + public ListenerDiscoverer( + CuratorFramework cf, + ListeningAnnouncerConfig listeningAnnouncerConfig + ) + { + this( + ServiceDiscoveryBuilder + .builder(Void.class) + .basePath(listeningAnnouncerConfig.getListenersPath()) + .client(cf) + .watchInstances(false) + .build() + ); + } + + // Exposed for unit tests + ListenerDiscoverer( + ServiceDiscovery serviceDiscovery + ) + { + this.serviceDiscovery = serviceDiscovery; + } + + @LifecycleStart + public void start() + { + synchronized (startStopSync) { + if (started) { + LOG.debug("Already started"); + return; + } + try { + serviceDiscovery.start(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (startStopSync) { + if (!started) { + LOG.debug("Already stopped"); + return; + } + final Closer closer = Closer.create(); + closer.register(serviceDiscovery); + for (ServiceCache serviceCache : services.values()) { + closer.register(serviceCache); + } + try { + closer.close(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + services.clear(); + started = false; + } + } + + /** + * Get nodes at a particular listener. + * This method lazily adds service discovery + * + * @param listener_key The Listener's service key + * + * @return A collection of druid nodes as established by the service discovery + */ + public Collection getNodes(final String listener_key) + { + ServiceCache serviceCache = services.get(listener_key); + if (serviceCache == null) { + synchronized (startStopSync) { + if (!started) { + throw new ISE("ListenerDiscoverer not started"); + } + serviceCache = serviceDiscovery + .serviceCacheBuilder() + .name(listener_key) + .threadFactory(Execs.makeThreadFactory("ListenerDiscoverer--%s")) + .build(); + try { + serviceCache.start(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + if (services.putIfAbsent(listener_key, serviceCache) != null) { + try { + serviceCache.close(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + serviceCache = services.get(listener_key); + if (serviceCache == null) { + throw new ISE("Race condition on listener key [%s]. Should not happen!", listener_key); + } + } + } + } + return ImmutableList.copyOf(Collections2.filter( + Lists.transform( + serviceCache.getInstances(), + new Function, DruidNode>() + { + @Nullable + @Override + public DruidNode apply(@Nullable ServiceInstance input) + { + if (input == null) { + LOG.debug("Instance for listener group [%s] was null", listener_key); + return null; + } + return new DruidNode(input.getName(), input.getAddress(), input.getPort()); + } + } + ), new Predicate() + { + @Override + public boolean apply(@Nullable DruidNode input) + { + return input != null; + } + } + )); + } +} diff --git a/server/src/main/java/io/druid/server/listener/announcer/ListenerResourceAnnouncer.java b/server/src/main/java/io/druid/server/listener/announcer/ListenerResourceAnnouncer.java new file mode 100644 index 000000000000..6235b7e9768b --- /dev/null +++ b/server/src/main/java/io/druid/server/listener/announcer/ListenerResourceAnnouncer.java @@ -0,0 +1,122 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.announcer; + +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.metamx.common.lifecycle.LifecycleStart; +import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.common.logger.Logger; +import io.druid.server.DruidNode; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; +import org.apache.curator.x.discovery.ServiceInstance; +import org.apache.curator.x.discovery.ServiceType; + +import java.io.IOException; + +/** + * Announces that there is a particular ListenerResource at the listener_key. + */ +public abstract class ListenerResourceAnnouncer +{ + private static final Logger LOG = new Logger(ListenerResourceAnnouncer.class); + private final Object startStopSync = new Object(); + private volatile boolean started = false; + private final ServiceDiscovery serviceDiscovery; + private final ServiceInstance me; + + public ListenerResourceAnnouncer( + CuratorFramework cf, + ListeningAnnouncerConfig listeningAnnouncerConfig, + String listener_key, + DruidNode node + ) + { + this( + ServiceDiscoveryBuilder + .builder(Void.class) + .basePath(listeningAnnouncerConfig.getListenersPath()) + .client(cf) + .watchInstances(false) + .build(), + listener_key, + node + ); + } + + ListenerResourceAnnouncer( + ServiceDiscovery serviceDiscovery, + String listener_key, + DruidNode node + ) + { + this.serviceDiscovery = serviceDiscovery; + try { + this.me = ServiceInstance + .builder() + .address(node.getHost()) + .port(node.getPort()) + .name(listener_key) + .serviceType(ServiceType.DYNAMIC) + .build(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @LifecycleStart + public void start() + { + synchronized (startStopSync) { + if (started) { + LOG.debug("Already started, ignoring"); + return; + } + try { + serviceDiscovery.start(); + serviceDiscovery.registerService(me); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + started = true; + } + } + + @LifecycleStop + public void stop() + { + synchronized (startStopSync) { + if (!started) { + LOG.debug("Already stopped, ignoring"); + return; + } + try { + serviceDiscovery.close(); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + started = false; + } + } +} diff --git a/server/src/main/java/io/druid/server/listener/announcer/ListenerResourceAnnouncerModule.java b/server/src/main/java/io/druid/server/listener/announcer/ListenerResourceAnnouncerModule.java new file mode 100644 index 000000000000..69ded33ca213 --- /dev/null +++ b/server/src/main/java/io/druid/server/listener/announcer/ListenerResourceAnnouncerModule.java @@ -0,0 +1,51 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.announcer; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LifecycleModule; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.DruidModule; + +import java.util.List; +import java.util.UUID; + +public class ListenerResourceAnnouncerModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("ListenerResourceAnnouncerModule-" + UUID.randomUUID().toString()) + .registerSubtypes(ListeningAnnouncerConfig.class) + ); + } + + @Override + public void configure(Binder binder) + { + binder.bind(ListenerDiscoverer.class).in(ManageLifecycle.class); + LifecycleModule.register(binder, ListenerDiscoverer.class); + } +} diff --git a/server/src/main/java/io/druid/server/listener/announcer/ListenerResourceDiscoveryModule.java b/server/src/main/java/io/druid/server/listener/announcer/ListenerResourceDiscoveryModule.java new file mode 100644 index 000000000000..c8f5c7991f41 --- /dev/null +++ b/server/src/main/java/io/druid/server/listener/announcer/ListenerResourceDiscoveryModule.java @@ -0,0 +1,51 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.announcer; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.LifecycleModule; +import io.druid.guice.ManageLifecycle; +import io.druid.initialization.DruidModule; + +import java.util.List; +import java.util.UUID; + +public class ListenerResourceDiscoveryModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("ListenerResourceDiscoveryModule-" + UUID.randomUUID().toString()) + .registerSubtypes(ListeningAnnouncerConfig.class) + ); + } + + @Override + public void configure(Binder binder) + { + binder.bind(ListenerDiscoverer.class).in(ManageLifecycle.class); + LifecycleModule.register(binder, ListenerDiscoverer.class); + } +} diff --git a/server/src/main/java/io/druid/server/listener/announcer/ListeningAnnouncerConfig.java b/server/src/main/java/io/druid/server/listener/announcer/ListeningAnnouncerConfig.java new file mode 100644 index 000000000000..bd73afdbf7ea --- /dev/null +++ b/server/src/main/java/io/druid/server/listener/announcer/ListeningAnnouncerConfig.java @@ -0,0 +1,101 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.announcer; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.inject.Inject; +import io.druid.server.initialization.ZkPathsConfig; +import org.apache.curator.utils.ZKPaths; + +/** + * Even though we provide the mechanism to get zk paths here, we do NOT handle announcing and unannouncing in this module. + * The reason is that it is not appropriate to force a global announce/unannounce since individual listeners may have + * different lifecycles. + */ +public class ListeningAnnouncerConfig +{ + @JacksonInject + private final ZkPathsConfig zkPathsConfig; + @JsonProperty("listenersPath") + private String listenersPath = null; + + @Inject + public ListeningAnnouncerConfig( + ZkPathsConfig zkPathsConfig + ) + { + this.zkPathsConfig = zkPathsConfig; + } + + @JsonProperty("listenersPath") + public String getListenersPath() + { + return listenersPath == null ? zkPathsConfig.defaultPath("listeners") : listenersPath; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ListeningAnnouncerConfig that = (ListeningAnnouncerConfig) o; + + return !(listenersPath != null ? !listenersPath.equals(that.listenersPath) : that.listenersPath != null); + + } + + @Override + public int hashCode() + { + return listenersPath != null ? listenersPath.hashCode() : 0; + } + + @Override + public String toString() + { + return "ListeningAnnouncerConfig{" + + "listenersPath='" + listenersPath + '\'' + + '}'; + } + + /** + * Build a path for the particular named listener. The first implementation of this is used with zookeeper, but + * there is nothing restricting its use in a more general pathing (example: http endpoint proxy for raft) + * @param listenerName The key for the listener. + * @return A path appropriate for use in zookeeper to discover the listeners with the particular listener name + */ + public String getAnnouncementPath(String listenerName) + { + return ZKPaths.makePath( + getListenersPath(), Preconditions.checkNotNull( + Strings.emptyToNull(listenerName), "Listener name cannot be null" + ) + ); + } +} diff --git a/server/src/main/java/io/druid/server/listener/resource/AbstractListenerHandler.java b/server/src/main/java/io/druid/server/listener/resource/AbstractListenerHandler.java new file mode 100644 index 000000000000..33b2ade13fab --- /dev/null +++ b/server/src/main/java/io/druid/server/listener/resource/AbstractListenerHandler.java @@ -0,0 +1,221 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.resource; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.metamx.common.logger.Logger; +import io.druid.common.utils.ServletResourceUtils; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.List; + +/** + * This is a simplified handler for announcement listeners. The input is expected to be a JSON list objects. + * + * Empty lists `[]` are taken care of at this level and never passed down to the subclass's handle method. + * + * @param A List of this type is expected in the input stream as JSON. Must be able to be converted to/from Map + */ +public abstract class AbstractListenerHandler implements ListenerHandler +{ + private static final Logger LOG = new Logger(AbstractListenerHandler.class); + private final TypeReference inObjTypeRef; + + /** + * The standard constructor takes in a type reference for the object and for a list of the object. + * This is to work arround some limitations in Java with type erasure. + * + * @param inObjTypeRef The TypeReference for the input object type + */ + public AbstractListenerHandler(TypeReference inObjTypeRef) + { + this.inObjTypeRef = inObjTypeRef; + } + + @NotNull + @Override + public final Response handlePOST(final InputStream inputStream, final ObjectMapper mapper) + { + try { + final Object o = post(ImmutableList.of(mapper.readValue(inputStream, inObjTypeRef))); + return Response.status(Response.Status.ACCEPTED).entity(o).build(); + } + catch (JsonParseException | JsonMappingException e) { + LOG.debug(e, "Bad request"); + return Response.status(Response.Status.BAD_REQUEST).entity(ServletResourceUtils.sanitizeException(e)).build(); + } + catch (Exception e) { + LOG.error(e, "Error handling request"); + return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build(); + } + } + + @NotNull + @Override + public final Response handlePOSTAll(final InputStream inputStream, final ObjectMapper mapper) + { + final List inObjList; + try { + // This actually fails to properly convert due to type erasure. We'll try again in a second + // This effectively just parses + final List tempList = mapper.readValue(inputStream, new TypeReference>() + { + }); + + // Now do the ACTUAL conversion + inObjList = ImmutableList.copyOf(Lists.transform(tempList, new Function() + { + @Override + public ObjType apply(Object input) + { + return mapper.convertValue(input, inObjTypeRef); + } + })); + } + catch (final IOException ex) { + LOG.debug(ex, "Bad request"); + return Response.status(Response.Status.BAD_REQUEST).entity(ServletResourceUtils.sanitizeException(ex)).build(); + } + final Object returnObj; + try { + returnObj = post(inObjList); + } + catch (Exception e) { + LOG.error(e, "Error handling request"); + return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build(); + } + if (returnObj == null) { + return Response.status(Response.Status.NOT_FOUND).build(); + } else { + return Response.status(Response.Status.ACCEPTED).entity(returnObj).build(); + } + } + + @Override + @NotNull + public final Response handleGET(@NotNull String id) + { + try { + final Object returnObj = get(id); + if (returnObj == null) { + return Response.status(Response.Status.NOT_FOUND).build(); + } else { + return Response.ok(returnObj).build(); + } + } + catch (Exception e) { + LOG.error(e, "Error handling get request for [%s]", id); + return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build(); + } + } + + @Override + @NotNull + public final Response handleGETAll() + { + final Collection all; + try { + all = getAll(); + if (all == null) { + return Response.status(Response.Status.NOT_FOUND).build(); + } else { + return Response.ok(all).build(); + } + } + catch (Exception e) { + LOG.error(e, "Error getting all"); + return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build(); + } + } + + @Override + @NotNull + public final Response handleDELETE(@NotNull String id) + { + try { + final Object returnObj = delete(id); + if (returnObj == null) { + return Response.status(Response.Status.NOT_FOUND).build(); + } else { + return Response.status(Response.Status.ACCEPTED).entity(returnObj).build(); + } + } + catch (Exception e) { + LOG.error(e, "Error in processing delete request for [%s]", id); + return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build(); + } + } + + @Override + public final void use_AbstractListenerHandler_instead() + { + // NOOP + } + + /** + * Delete the object for a particular id + * + * @param id A string id of the object to be deleted. This id is never null or empty. + * + * @return The object to be returned in the entity. A NULL return will cause a 404 response. A non-null return will cause a 202 response. An Exception thrown will cause a 500 response. + */ + protected abstract + @Nullable + Object delete(@NotNull String id); + + /** + * Get the object for a particular id + * + * @param id A string id of the object desired. This id is never null or empty. + * + * @return The object to be returned in the entity. A NULL return will cause a 404 response. A non-null return will cause a 200 response. An Exception thrown will cause a 500 response. + */ + protected abstract + @Nullable + Object get(@NotNull String id); + + protected abstract + @Nullable + Collection getAll(); + + /** + * Process a POST request of the input items + * + * @param inputObject A list of the objects which were POSTed + * + * @return An object to be returned in the entity of the response. + * + * @throws Exception + */ + public abstract + @Nullable + Object post(@NotNull List inputObject) throws Exception; +} diff --git a/server/src/main/java/io/druid/server/listener/resource/ListenerHandler.java b/server/src/main/java/io/druid/server/listener/resource/ListenerHandler.java new file mode 100644 index 000000000000..6aa7abf2080e --- /dev/null +++ b/server/src/main/java/io/druid/server/listener/resource/ListenerHandler.java @@ -0,0 +1,40 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.resource; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import javax.validation.constraints.NotNull; +import javax.ws.rs.core.Response; +import java.io.InputStream; + +/** + * A handler for events related to the listening-announcer. + * Developers are *STRONGLY* encouraged to use AbstractListenerHandler instead to adhere to return codes. + */ +public interface ListenerHandler +{ + @NotNull Response handlePOST(InputStream inputStream, ObjectMapper mapper); + @NotNull Response handlePOSTAll(InputStream inputStream, ObjectMapper mapper); + @NotNull Response handleGET(@NotNull String id); + @NotNull Response handleGETAll(); + @NotNull Response handleDELETE(@NotNull String id); + void use_AbstractListenerHandler_instead(); +} diff --git a/server/src/main/java/io/druid/server/listener/resource/ListenerResource.java b/server/src/main/java/io/druid/server/listener/resource/ListenerResource.java new file mode 100644 index 000000000000..df616740a524 --- /dev/null +++ b/server/src/main/java/io/druid/server/listener/resource/ListenerResource.java @@ -0,0 +1,183 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.resource; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.api.client.repackaged.com.google.common.base.Strings; +import com.metamx.common.logger.Logger; +import io.druid.common.utils.ServletResourceUtils; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.InputStream; + +/** + * This is a simple announcement resource that handles simple items that have a POST to an announcement endpoint, a + * GET of something in that endpoint with an ID, and a DELETE to that endpoint with an ID. + * + * The idea of this resource is simply to have a simple endpoint for basic POJO handling assuming the POJO has an ID + * which distinguishes it from others of its kind. + * + * This resource is expected to NOT block for POSTs, and is instead expected to make a best effort at returning + * as quickly as possible. Thus, returning ACCEPTED instead of OK is normal for POST methods here. + * + * Items tagged with a particular ID for an announcement listener are updated by a POST to the announcement listener's + * path "/{announcement}" + * + * Discovery of who can listen to particular announcement keys is not part of this class and should be handled + * by ListenerResourceAnnouncer + */ +public abstract class ListenerResource +{ + public static final String BASE_PATH = "/druid/listen/v1"; + private static final Logger log = new Logger(ListenerResource.class); + + private final ObjectMapper jsonMapper; + private final ObjectMapper smileMapper; + private final ListenerHandler handler; + + public ListenerResource( + final @Json ObjectMapper jsonMapper, + final @Smile ObjectMapper smileMapper, + final ListenerHandler handler + ) + { + this.jsonMapper = Preconditions.checkNotNull(jsonMapper); + this.smileMapper = smileMapper; + this.handler = handler; + } + + @POST + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + public Response serviceAnnouncementPOSTAll( + final InputStream inputStream, + final @Context HttpServletRequest req // used only to get request content-type + ) + { + final String reqContentType = req.getContentType(); + final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType); + final ObjectMapper mapper = isSmile ? smileMapper : jsonMapper; + try { + return handler.handlePOSTAll(inputStream, mapper); + } + catch (Exception e) { + log.error(e, "Exception in handling POSTAll request"); + return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build(); + } + } + + @GET + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + public Response getAll() + { + try { + return handler.handleGETAll(); + } + catch (Exception e) { + log.error(e, "Exception in handling GETAll request"); + return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build(); + } + } + + @Path("/{id}") + @GET + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + public Response serviceAnnouncementGET( + final @PathParam("id") String id + ) + { + if (Strings.isNullOrEmpty(id)) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ServletResourceUtils.sanitizeException( + new IllegalArgumentException( + "Cannot have null or empty id" + ) + )) + .build(); + } + try { + return handler.handleGET(id); + } + catch (Exception e) { + log.error(e, "Exception in handling GET request for [%s]", id); + return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build(); + } + } + + @Path("/{id}") + @DELETE + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + public Response serviceAnnouncementDELETE( + final @PathParam("id") String id + ) + { + if (Strings.isNullOrEmpty(id)) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(ServletResourceUtils.sanitizeException( + new IllegalArgumentException( + "Cannot have null or empty id" + ) + )) + .build(); + } + try { + return handler.handleDELETE(id); + } + catch (Exception e) { + log.error(e, "Exception in handling DELETE request for [%s]", id); + return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build(); + } + } + + @Path("/{id}") + @POST + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + public Response serviceAnnouncementPOST( + final @PathParam("id") String id, + final InputStream inputStream, + final @Context HttpServletRequest req // used only to get request content-type + ) + { + final String reqContentType = req.getContentType(); + final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType); + final ObjectMapper mapper = isSmile ? smileMapper : jsonMapper; + try { + return handler.handlePOST(inputStream, mapper); + } + catch (Exception e) { + log.error(e, "Exception in handling POST request for ID [%s]", id); + return Response.serverError().entity(ServletResourceUtils.sanitizeException(e)).build(); + } + } +} diff --git a/server/src/test/java/io/druid/server/listener/announcer/ListenerDiscovererTest.java b/server/src/test/java/io/druid/server/listener/announcer/ListenerDiscovererTest.java new file mode 100644 index 000000000000..c2392006f5d0 --- /dev/null +++ b/server/src/test/java/io/druid/server/listener/announcer/ListenerDiscovererTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.announcer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.io.Closer; +import io.druid.curator.CuratorTestBase; +import io.druid.server.DruidNode; +import io.druid.server.initialization.ZkPathsConfig; +import org.apache.curator.x.discovery.ServiceCache; +import org.apache.curator.x.discovery.ServiceCacheBuilder; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceInstance; +import org.easymock.EasyMock; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadFactory; + +public class ListenerDiscovererTest extends CuratorTestBase +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private final ServiceDiscovery serviceDiscovery = EasyMock.createStrictMock(ServiceDiscovery.class); + private final String listen_key = "listener_key"; + + @Test + public void testGetNodes() throws Exception + { + final ServiceCache serviceCache = EasyMock.createStrictMock(ServiceCache.class); + final ServiceCache serviceCacheEmpty = EasyMock.createStrictMock(ServiceCache.class); + final ServiceCacheBuilder serviceCacheBuilder = EasyMock.createStrictMock(ServiceCacheBuilder.class); + final ServiceCacheBuilder serviceCacheBuilderEmpty = EasyMock.createStrictMock(ServiceCacheBuilder.class); + final ListenerDiscoverer listenerDiscoverer = new ListenerDiscoverer(serviceDiscovery); + + // Lazy + serviceDiscovery.start(); + EasyMock.expectLastCall().once(); + EasyMock.replay(serviceDiscovery); + listenerDiscoverer.start(); + EasyMock.verify(serviceDiscovery); + + final String host = "local.hostname"; + final int port = 999; + final ServiceInstance serviceInstance = ServiceInstance.builder().address(host) + .port(port) + .name(listen_key) + .build(); + final List> serviceInstanceList = ImmutableList.of(serviceInstance); + + serviceCache.start(); + EasyMock.expectLastCall().once(); + EasyMock.expect(serviceCache.getInstances()).andReturn(serviceInstanceList).once(); + + + EasyMock.expect(serviceCacheBuilder.name(EasyMock.eq(listen_key))).andReturn(serviceCacheBuilder).once(); + EasyMock.expect(serviceCacheBuilder.threadFactory(EasyMock.anyObject())) + .andReturn(serviceCacheBuilder) + .once(); + EasyMock.expect(serviceCacheBuilder.build()).andReturn(serviceCache).once(); + + + final String not_listen_key = "NOTlistener_key"; + EasyMock.expect(serviceCacheBuilder.name(EasyMock.eq(not_listen_key))).andReturn(serviceCacheBuilderEmpty).once(); + EasyMock.expect(serviceCacheBuilderEmpty.threadFactory(EasyMock.anyObject())) + .andReturn(serviceCacheBuilderEmpty) + .once(); + EasyMock.expect(serviceCacheBuilderEmpty.build()).andReturn(serviceCacheEmpty).once(); + serviceCacheEmpty.start(); + EasyMock.expectLastCall().once(); + EasyMock.expect(serviceCacheEmpty.getInstances()).andReturn(ImmutableList.>of()).once(); + + EasyMock.reset(serviceDiscovery); + EasyMock.expect(serviceDiscovery.serviceCacheBuilder()).andReturn(serviceCacheBuilder).times(2); + + EasyMock.replay(serviceDiscovery, serviceCacheBuilder, serviceCache, serviceCacheBuilderEmpty, serviceCacheEmpty); + + final Collection nodes = listenerDiscoverer.getNodes(listen_key); + final Collection no_nodes = listenerDiscoverer.getNodes(not_listen_key); + + EasyMock.verify(serviceDiscovery, serviceCacheBuilder, serviceCache, serviceCacheBuilderEmpty, serviceCacheEmpty); + + Assert.assertEquals(1, nodes.size()); + final DruidNode node = Iterables.getFirst(nodes, null); + Assert.assertNotNull(node); + Assert.assertEquals(host, node.getHost()); + Assert.assertEquals(port, node.getPort()); + + Assert.assertTrue(no_nodes.isEmpty()); + } + + + @Test + public void testStartError() throws Exception + { + final RuntimeException ex = new RuntimeException("test exception"); + serviceDiscovery.start(); + EasyMock.expectLastCall().andThrow(ex).once(); + expectedException.expect(new BaseMatcher() + { + @Override + public boolean matches(Object o) + { + return o == ex; + } + + @Override + public void describeTo(Description description) + { + + } + }); + EasyMock.replay(serviceDiscovery); + final ListenerDiscoverer listenerDiscoverer = new ListenerDiscoverer(serviceDiscovery); + listenerDiscoverer.start(); + EasyMock.verify(serviceDiscovery); + } + + + @Test + public void testStopError() throws Exception + { + final IOException ex = new IOException("test exception"); + serviceDiscovery.start(); + EasyMock.expectLastCall().once(); + serviceDiscovery.close(); + EasyMock.expectLastCall().andThrow(ex).once(); + expectedException.expectCause(new BaseMatcher() + { + @Override + public boolean matches(Object o) + { + return o == ex; + } + + @Override + public void describeTo(Description description) + { + + } + }); + EasyMock.replay(serviceDiscovery); + final ListenerDiscoverer listenerDiscoverer = new ListenerDiscoverer(serviceDiscovery); + listenerDiscoverer.start(); + listenerDiscoverer.stop(); + EasyMock.verify(serviceDiscovery); + } + + @Test(timeout = 5_000) + public void testFullService() throws Exception + { + setupServerAndCurator(); + final Closer closer = Closer.create(); + try { + closer.register(server); + closer.register(curator); + curator.start(); + curator.blockUntilConnected(); + final ListeningAnnouncerConfig config = new ListeningAnnouncerConfig(new ZkPathsConfig()); + final ListenerDiscoverer listenerDiscoverer = new ListenerDiscoverer(curator, config); + listenerDiscoverer.start(); + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + listenerDiscoverer.stop(); + } + }); + Assert.assertTrue(listenerDiscoverer.getNodes(listen_key).isEmpty()); + + final DruidNode node = new DruidNode(listen_key, "someHost", 8888); + final ListenerResourceAnnouncer listenerResourceAnnouncer = new ListenerResourceAnnouncer( + curator, + config, + listen_key, + node + ) + { + }; + listenerResourceAnnouncer.start(); + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + listenerResourceAnnouncer.stop(); + } + }); + + // Have to wait for background syncing + while (listenerDiscoverer.getNodes(listen_key).isEmpty()) { + // Will timeout at test's timeout setting + Thread.sleep(1); + } + Assert.assertEquals(ImmutableList.of(node), listenerDiscoverer.getNodes(listen_key)); + } + catch (Throwable t) { + throw closer.rethrow(t); + } + finally { + closer.close(); + } + } +} diff --git a/server/src/test/java/io/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java b/server/src/test/java/io/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java new file mode 100644 index 000000000000..ba8e19e29a86 --- /dev/null +++ b/server/src/test/java/io/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.announcer; + +import io.druid.curator.CuratorTestBase; +import io.druid.server.DruidNode; +import io.druid.server.initialization.ZkPathsConfig; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.x.discovery.ServiceDiscovery; +import org.apache.curator.x.discovery.ServiceInstance; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class ListenerResourceAnnouncerTest extends CuratorTestBase +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testAnnouncerBehaves() throws Exception + { + setupServerAndCurator(); + final String listener_key = "listener_key"; + final ListeningAnnouncerConfig config = new ListeningAnnouncerConfig(new ZkPathsConfig()); + curator.start(); + try { + Assert.assertTrue(curator.blockUntilConnected(10, TimeUnit.SECONDS)); + final DruidNode node = new DruidNode("test_service", "localhost", -1); + final ListenerResourceAnnouncer listenerResourceAnnouncer = new ListenerResourceAnnouncer( + curator, + config, + listener_key, + node + ) + { + }; + listenerResourceAnnouncer.start(); + final String path = config.getAnnouncementPath(listener_key); + Assert.assertNotNull(curator.checkExists().forPath(path)); + Assert.assertNull(curator.checkExists().forPath(config.getAnnouncementPath(listener_key + "FOO"))); + listenerResourceAnnouncer.stop(); + listenerResourceAnnouncer.start(); + listenerResourceAnnouncer.start(); + listenerResourceAnnouncer.stop(); + listenerResourceAnnouncer.stop(); + listenerResourceAnnouncer.start(); + listenerResourceAnnouncer.stop(); + listenerResourceAnnouncer.start(); + listenerResourceAnnouncer.stop(); + // Curator atomicity sucks + // Assert.assertNull(curator.checkExists().forPath(path)); + } + finally { + try { + curator.close(); + } + finally { + tearDownServerAndCurator(); + } + } + Assert.assertEquals(CuratorFrameworkState.STOPPED, curator.getState()); + } + + @Test + public void testStartCorrect() throws Exception + { + final ServiceDiscovery discovery = EasyMock.createStrictMock(ServiceDiscovery.class); + final String listener_key = "listener_key_thing"; + final DruidNode node = new DruidNode("some_service", "some_host", -1); + + Capture> serviceInstanceCapture = Capture.newInstance(); + discovery.start(); + EasyMock.expectLastCall().once(); + discovery.registerService(EasyMock.capture(serviceInstanceCapture)); + EasyMock.expectLastCall().once(); + EasyMock.replay(discovery); + + final ListenerResourceAnnouncer resourceAnnouncer = new ListenerResourceAnnouncer( + discovery, + listener_key, + node + ) + { + }; + resourceAnnouncer.start(); + Assert.assertTrue(serviceInstanceCapture.hasCaptured()); + final ServiceInstance capturedService = serviceInstanceCapture.getValue(); + Assert.assertEquals(node.getHost(), capturedService.getAddress()); + Assert.assertEquals(node.getPort(), (int) capturedService.getPort()); + Assert.assertEquals(listener_key, capturedService.getName()); + EasyMock.verify(discovery); + } + + @Test + public void testStopError() throws Exception + { + final ServiceDiscovery discovery = EasyMock.createStrictMock(ServiceDiscovery.class); + final String listener_key = "listener_key_thing"; + final DruidNode node = new DruidNode("some_service", "some_host", -1); + final IOException ex = new IOException("test exception"); + + + discovery.start(); + EasyMock.expectLastCall().once(); + discovery.registerService(EasyMock.>anyObject()); + EasyMock.expectLastCall().once(); + discovery.close(); + EasyMock.expectLastCall().andThrow(ex).once(); + + expectedException.expectCause(new BaseMatcher() + { + @Override + public boolean matches(Object o) + { + return o == ex; + } + + @Override + public void describeTo(Description description) + { + + } + }); + EasyMock.replay(discovery); + + final ListenerResourceAnnouncer resourceAnnouncer = new ListenerResourceAnnouncer( + discovery, + listener_key, + node + ) + { + }; + + try { + resourceAnnouncer.start(); + resourceAnnouncer.stop(); + } + finally { + EasyMock.verify(discovery); + } + } + + @Test + public void testStartError() throws Exception + { + final ServiceDiscovery discovery = EasyMock.createStrictMock(ServiceDiscovery.class); + final String listener_key = "listener_key_thing"; + final DruidNode node = new DruidNode("some_service", "some_host", -1); + final RuntimeException ex = new RuntimeException("test exception"); + + discovery.start(); + EasyMock.expectLastCall().andThrow(ex).once(); + expectedException.expect(new BaseMatcher() + { + @Override + public boolean matches(Object o) + { + return o == ex; + } + + @Override + public void describeTo(Description description) + { + + } + }); + EasyMock.replay(discovery); + final ListenerResourceAnnouncer resourceAnnouncer = new ListenerResourceAnnouncer( + discovery, + listener_key, + node + ) + { + }; + try { + resourceAnnouncer.start(); + } + finally { + EasyMock.verify(discovery); + } + } +} diff --git a/server/src/test/java/io/druid/server/listener/resource/AbstractListenerHandlerTest.java b/server/src/test/java/io/druid/server/listener/resource/AbstractListenerHandlerTest.java new file mode 100644 index 000000000000..56cb7d957ed3 --- /dev/null +++ b/server/src/test/java/io/druid/server/listener/resource/AbstractListenerHandlerTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.resource; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.StringUtils; +import io.druid.jackson.DefaultObjectMapper; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import javax.ws.rs.core.Response; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AbstractListenerHandlerTest +{ + final ObjectMapper mapper = new DefaultObjectMapper(); + final AtomicBoolean failPost = new AtomicBoolean(false); + final String error_msg = "err message"; + + final Object good_object = new Object(); + final AtomicBoolean shouldFail = new AtomicBoolean(false); + final AtomicBoolean returnEmpty = new AtomicBoolean(false); + final String error_message = "some error message"; + final String good_id = "good id"; + final String error_id = "error id"; + final Collection all = ImmutableList.of(); + + + final Object obj = new Object(); + final String valid_id = "some_id"; + + final AbstractListenerHandler abstractListenerHandler = + new AbstractListenerHandler(SomeBeanClass.TYPE_REFERENCE) + { + @Nullable + @Override + public Object post(@NotNull List inputObject) throws Exception + { + if (failPost.get()) { + throw new Exception(error_msg); + } + return inputObject.isEmpty() ? null : inputObject; + } + + @Nullable + @Override + protected Object get(@NotNull String id) + { + if (error_id.equals(id)) { + throw new RuntimeException(error_message); + } + return good_id.equals(id) ? good_object : null; + } + + @Nullable + @Override + protected Collection getAll() + { + if (shouldFail.get()) { + throw new RuntimeException(error_message); + } + return returnEmpty.get() ? null : all; + } + + @Nullable + @Override + protected Object delete(@NotNull String id) + { + if (error_id.equals(id)) { + throw new RuntimeException(error_msg); + } + return valid_id.equals(id) ? obj : null; + } + }; + + @Before + public void setUp() + { + mapper.registerSubtypes(SomeBeanClass.class); + } + + @Test + public void testSimple() throws Exception + { + final SomeBeanClass val = new SomeBeanClass("a"); + final ByteArrayInputStream bais = new ByteArrayInputStream(StringUtils.toUtf8(mapper.writeValueAsString(val))); + final Response response = abstractListenerHandler.handlePOST(bais, mapper); + Assert.assertEquals(202, response.getStatus()); + Assert.assertEquals(Collections.singletonList(val), response.getEntity()); + } + + + @Test + public void testSimpleAll() throws Exception + { + final Collection val = Collections.singletonList(new SomeBeanClass("a")); + final ByteArrayInputStream bais = new ByteArrayInputStream( + StringUtils.toUtf8( + mapper.writeValueAsString( + val + ) + ) + ); + final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper); + Assert.assertEquals(202, response.getStatus()); + Assert.assertEquals(val, response.getEntity()); + } + + @Test + public void testMissingAll() throws Exception + { + final Collection val = ImmutableList.of(); + final ByteArrayInputStream bais = new ByteArrayInputStream( + StringUtils.toUtf8( + mapper.writeValueAsString( + val + ) + ) + ); + final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper); + Assert.assertEquals(404, response.getStatus()); + } + + @Test + public void testErrorAll() throws Exception + { + final Collection val = ImmutableList.of(); + final ByteArrayInputStream bais = new ByteArrayInputStream( + StringUtils.toUtf8( + mapper.writeValueAsString( + val + ) + ) + ); + failPost.set(true); + final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper); + Assert.assertEquals(500, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity()); + } + + @Test + public void testError() throws Exception + { + final ByteArrayInputStream bais = new ByteArrayInputStream(StringUtils.toUtf8(mapper.writeValueAsString(new SomeBeanClass( + "a")))); + failPost.set(true); + final Response response = abstractListenerHandler.handlePOST(bais, mapper); + Assert.assertEquals(500, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity()); + } + + @Test + public void testBadInput() throws Exception + { + final ByteArrayInputStream bais = new ByteArrayInputStream(new byte[]{0, 0, 0}); + final Response response = abstractListenerHandler.handlePOST(bais, mapper); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testBadInnerInput() throws Exception + { + final ByteArrayInputStream bais = new ByteArrayInputStream(new byte[]{}); + final ObjectMapper mapper = EasyMock.createStrictMock(ObjectMapper.class); + EasyMock.expect(mapper.readValue(EasyMock.anyObject(), EasyMock.>anyObject())).andThrow(new IOException()); + EasyMock.replay(mapper); + final Response response = abstractListenerHandler.handlePOSTAll(bais, mapper); + Assert.assertEquals(400, response.getStatus()); + EasyMock.verify(mapper); + } + + + @Test + public void testHandleSimpleDELETE() throws Exception + { + final Response response = abstractListenerHandler.handleDELETE(valid_id); + Assert.assertEquals(202, response.getStatus()); + Assert.assertEquals(obj, response.getEntity()); + } + + @Test + public void testMissingDELETE() throws Exception + { + final Response response = abstractListenerHandler.handleDELETE("not going to find it"); + Assert.assertEquals(404, response.getStatus()); + } + + @Test + public void testErrorDELETE() throws Exception + { + final Response response = abstractListenerHandler.handleDELETE(error_id); + Assert.assertEquals(500, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", error_msg), response.getEntity()); + } + + @Test + public void testHandle() throws Exception + { + final Response response = abstractListenerHandler.handleGET(good_id); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(good_object, response.getEntity()); + } + + @Test + public void testMissingHandle() throws Exception + { + final Response response = abstractListenerHandler.handleGET("neva gonna get it"); + Assert.assertEquals(404, response.getStatus()); + } + + @Test + public void testExceptionalHandle() throws Exception + { + final Response response = abstractListenerHandler.handleGET(error_id); + Assert.assertEquals(500, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", error_message), response.getEntity()); + } + + @Test + public void testHandleAll() throws Exception + { + final Response response = abstractListenerHandler.handleGETAll(); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(all, response.getEntity()); + } + + @Test + public void testExceptionalHandleAll() throws Exception + { + shouldFail.set(true); + final Response response = abstractListenerHandler.handleGETAll(); + Assert.assertEquals(500, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("error", error_message), response.getEntity()); + } + + @Test + public void testMissingHandleAll() throws Exception + { + returnEmpty.set(true); + final Response response = abstractListenerHandler.handleGETAll(); + Assert.assertEquals(404, response.getStatus()); + } +} diff --git a/server/src/test/java/io/druid/server/listener/resource/ListenerResourceTest.java b/server/src/test/java/io/druid/server/listener/resource/ListenerResourceTest.java new file mode 100644 index 000000000000..53390fa0b54b --- /dev/null +++ b/server/src/test/java/io/druid/server/listener/resource/ListenerResourceTest.java @@ -0,0 +1,549 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.listener.resource; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteSource; +import com.metamx.common.StringUtils; +import io.druid.jackson.DefaultObjectMapper; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.validation.constraints.NotNull; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + + +public class ListenerResourceTest +{ + static final String ANN_ID = "announce_id"; + HttpServletRequest req; + final ObjectMapper mapper = new DefaultObjectMapper(); + private static final ByteSource EMPTY_JSON_LIST = new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + return new ByteArrayInputStream(StringUtils.toUtf8("[]")); + } + }; + + @Before + public void setUp() throws Exception + { + mapper.registerSubtypes(SomeBeanClass.class); + req = EasyMock.createNiceMock(HttpServletRequest.class); + EasyMock.expect(req.getContentType()).andReturn(MediaType.APPLICATION_JSON).anyTimes(); + EasyMock.replay(req); + } + + @After + public void tearDown() throws Exception + { + + } + + @Test + public void testServiceAnnouncementPOSTExceptionInHandler() throws Exception + { + final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class); + EasyMock.expect(handler.handlePOST(EasyMock.anyObject(), EasyMock.anyObject())).andThrow(new RuntimeException("test")); + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + EasyMock.replay(handler); + Assert.assertEquals( + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), + resource.serviceAnnouncementPOST("id", EMPTY_JSON_LIST.openStream(), req).getStatus() + ); + EasyMock.verify(req, handler); + } + + @Test + public void testServiceAnnouncementPOSTAllExceptionInHandler() throws Exception + { + final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class); + EasyMock.expect(handler.handlePOSTAll(EasyMock.anyObject(), EasyMock.anyObject())).andThrow(new RuntimeException("test")); + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + EasyMock.replay(handler); + Assert.assertEquals( + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), + resource.serviceAnnouncementPOSTAll(EMPTY_JSON_LIST.openStream(), req).getStatus() + ); + EasyMock.verify(req, handler); + } + + @Test + public void testServiceAnnouncementPOST() throws Exception + { + final AtomicInteger c = new AtomicInteger(0); + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + new ExceptionalAbstractListenerHandler() + { + @Override + public Object post(List l) + { + c.incrementAndGet(); + return l; + } + } + ) + { + }; + Assert.assertEquals( + 202, + resource.serviceAnnouncementPOSTAll(EMPTY_JSON_LIST.openStream(), req).getStatus() + ); + Assert.assertEquals(1, c.get()); + EasyMock.verify(req); + } + + @Test + public void testServiceAnnouncementGET() throws Exception + { + final AtomicInteger c = new AtomicInteger(0); + final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler() + { + @Override + public Object get(String id) + { + c.incrementAndGet(); + return ANN_ID.equals(id) ? ANN_ID : null; + } + }; + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + Assert.assertEquals( + Response.Status.OK.getStatusCode(), + resource.serviceAnnouncementGET(ANN_ID).getStatus() + ); + Assert.assertEquals(1, c.get()); + EasyMock.verify(req); + } + + + @Test + public void testServiceAnnouncementGETNull() throws Exception + { + final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler(); + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + Assert.assertEquals( + 400, + resource.serviceAnnouncementGET(null).getStatus() + ); + Assert.assertEquals( + 400, + resource.serviceAnnouncementGET("").getStatus() + ); + EasyMock.verify(req); + } + + @Test + public void testServiceAnnouncementGETExceptionInHandler() throws Exception + { + final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class); + EasyMock.expect(handler.handleGET(EasyMock.anyString())).andThrow(new RuntimeException("test")); + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + EasyMock.replay(handler); + Assert.assertEquals( + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), + resource.serviceAnnouncementGET("id").getStatus() + ); + EasyMock.verify(handler); + } + + @Test + public void testServiceAnnouncementGETAllExceptionInHandler() throws Exception + { + final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class); + EasyMock.expect(handler.handleGETAll()).andThrow(new RuntimeException("test")); + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + EasyMock.replay(handler); + Assert.assertEquals( + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), + resource.getAll().getStatus() + ); + EasyMock.verify(handler); + } + + @Test + public void testServiceAnnouncementDELETENullID() throws Exception + { + final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler(); + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + + Assert.assertEquals( + Response.Status.BAD_REQUEST.getStatusCode(), + resource.serviceAnnouncementDELETE(null).getStatus() + ); + } + + @Test + public void testServiceAnnouncementDELETEExceptionInHandler() throws Exception + { + + final ListenerHandler handler = EasyMock.createStrictMock(ListenerHandler.class); + EasyMock.expect(handler.handleDELETE(EasyMock.anyString())).andThrow(new RuntimeException("test")); + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + EasyMock.replay(handler); + Assert.assertEquals( + Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), + resource.serviceAnnouncementDELETE("id").getStatus() + ); + EasyMock.verify(handler); + } + + @Test + public void testServiceAnnouncementDELETE() throws Exception + { + final AtomicInteger c = new AtomicInteger(0); + final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler() + { + @Override + public Object delete(String id) + { + c.incrementAndGet(); + return ANN_ID.equals(id) ? ANN_ID : null; + } + }; + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + Assert.assertEquals( + 202, + resource.serviceAnnouncementDELETE(ANN_ID).getStatus() + ); + Assert.assertEquals(1, c.get()); + EasyMock.verify(req); + } + + @Test + // Take a list of strings wrap them in a JSON POJO and get them back as an array string in the POST function + public void testAbstractPostHandler() throws Exception + { + final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler() + { + + @Nullable + @Override + public String post( + @NotNull List inputObject + ) throws Exception + { + return Arrays.deepToString( + Collections2.transform( + inputObject, new Function() + { + @Nullable + @Override + public String apply( + @Nullable SomeBeanClass input + ) + { + if (input == null) { + throw new NullPointerException(); + } + return input.getP(); + } + } + ).toArray() + ); + } + }; + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + final List strings = ImmutableList.of("test1", "test2"); + final Response response = resource.serviceAnnouncementPOSTAll( + new ByteArrayInputStream( + StringUtils.toUtf8( + mapper.writeValueAsString( + ImmutableList.copyOf( + Collections2.transform( + strings, + new Function() + { + @Nullable + @Override + public SomeBeanClass apply(String input) + { + return new SomeBeanClass(input); + } + } + ) + ) + ) + ) + ), + req + ); + Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); + Assert.assertEquals(Arrays.deepToString(strings.toArray()), response.getEntity()); + } + + + @Test + public void testAbstractPostHandlerEmptyList() throws Exception + { + final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler() + { + @Override + public String post(List inputObject) throws Exception + { + return Arrays.deepToString( + Collections2.transform( + inputObject, new Function() + { + @Nullable + @Override + public String apply( + @Nullable SomeBeanClass input + ) + { + throw new UnsupportedOperationException("Shouldn't have made it here"); + } + } + ).toArray() + ); + } + }; + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + final Response response = resource.serviceAnnouncementPOSTAll( + EMPTY_JSON_LIST.openStream(), + req + ); + Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), response.getStatus()); + Assert.assertEquals("[]", response.getEntity()); + } + + + @Test + public void testAbstractPostHandlerException() throws Exception + { + final AbstractListenerHandler handler = new ExceptionalAbstractListenerHandler() + { + + @Override + public String post(List inputObject) throws Exception + { + throw new UnsupportedOperationException("nope!"); + } + }; + final ListenerResource resource = new ListenerResource( + mapper, + mapper, + handler + ) + { + }; + final List strings = ImmutableList.of("test1", "test2"); + final Response response = resource.serviceAnnouncementPOSTAll( + new ByteArrayInputStream( + StringUtils.toUtf8( + mapper.writeValueAsString( + Collections2.transform( + strings, new Function() + { + @Nullable + @Override + public SomeBeanClass apply(String input) + { + return new SomeBeanClass(input); + } + } + ) + ) + ) + ), + req + ); + Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); + } +} + +@JsonTypeName("someBean") +class SomeBeanClass +{ + protected static final TypeReference TYPE_REFERENCE = new TypeReference() + { + }; + + private final String p; + + @JsonCreator + public SomeBeanClass( + @JsonProperty("p") String p + ) + { + this.p = p; + } + + @JsonProperty + public String getP() + { + return this.p; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SomeBeanClass that = (SomeBeanClass) o; + + return p != null ? p.equals(that.p) : that.p == null; + + } + + @Override + public int hashCode() + { + return p != null ? p.hashCode() : 0; + } + + @Override + public String toString() + { + return "SomeBeanClass{" + + "p='" + p + '\'' + + '}'; + } +} + +class ExceptionalAbstractListenerHandler extends AbstractListenerHandler +{ + public ExceptionalAbstractListenerHandler() + { + super(SomeBeanClass.TYPE_REFERENCE); + } + + @Nullable + @Override + protected Object delete(@NotNull String id) + { + throw new UnsupportedOperationException("should not have called DELETE"); + } + + @Nullable + @Override + protected Object get(@NotNull String id) + { + throw new UnsupportedOperationException("should not have called GET"); + } + + @Nullable + @Override + protected Collection getAll() + { + throw new UnsupportedOperationException("should not have called GET all"); + } + + @Nullable + @Override + public Object post(@NotNull List inputObject) throws Exception + { + throw new UnsupportedOperationException("should not have called post"); + } +}