From 76f6baebf1f890db84bfa18a3a2788068f691d10 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 20 Apr 2019 13:11:08 -0700 Subject: [PATCH 1/2] Fix encoded taskId check in chatHandlerResource --- .../indexing/common/IndexTaskClient.java | 2 +- .../parallel/ParallelIndexSupervisorTask.java | 6 +-- .../firehose/ChatHandlerResource.java | 20 +++++++-- .../jetty/BadRequestException.java | 28 +++++++++++++ .../jetty/BadRequestExceptionMapper.java | 41 +++++++++++++++++++ .../jetty/JettyServerModule.java | 1 + 6 files changed, 89 insertions(+), 9 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java create mode 100644 server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestExceptionMapper.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java index 483795fc7775..ad8bb392c941 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java @@ -355,7 +355,7 @@ private FullResponseHolder submitRequest( } else if (responseCode == 400) { // don't bother retrying if it's a bad request throw new IAE("Received 400 Bad Request with body: %s", response.getContent()); } else { - throw new IOE("Received status [%d]", responseCode); + throw new IOE("Received status [%d] and content [%s]", responseCode, response.getContent()); } } catch (IOException | ChannelException e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index ecf76ee3d201..2a29726965c4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -356,10 +356,8 @@ private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningC @POST @Path("/segment/allocate") @Produces(SmileMediaTypes.APPLICATION_JACKSON_SMILE) - public Response allocateSegment( - DateTime timestamp, - @Context final HttpServletRequest req - ) + @Consumes(SmileMediaTypes.APPLICATION_JACKSON_SMILE) + public Response allocateSegment(DateTime timestamp, @Context final HttpServletRequest req) { ChatHandlers.authorizationCheck( req, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java index 9e64731ec9c0..98f508f0bfa2 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java @@ -20,8 +20,10 @@ package org.apache.druid.segment.realtime.firehose; import com.google.common.base.Optional; +import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.server.initialization.jetty.BadRequestException; import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import javax.ws.rs.Path; @@ -49,9 +51,19 @@ public ChatHandlerResource(final ChatHandlerProvider handlers, final DataSourceT public Object doTaskChat(@PathParam("id") String handlerId, @Context HttpHeaders headers) { if (taskId != null) { - List requestTaskId = headers.getRequestHeader(TASK_ID_HEADER); - if (requestTaskId != null && !requestTaskId.contains(StringUtils.urlEncode(taskId))) { - return null; + final List requestTaskIds = headers.getRequestHeader(TASK_ID_HEADER); + final String requestTaskId = requestTaskIds != null && !requestTaskIds.isEmpty() + ? Iterables.getOnlyElement(requestTaskIds) + : null; + + // Sanity check: Callers set TASK_ID_HEADER to our taskId (URL-encoded, if >= 0.14.0) if they want to be + // assured of talking to the correct task, and not just some other task running on the same port. + if (requestTaskId != null + && !requestTaskId.equals(taskId) + && !StringUtils.urlDecode(requestTaskId).equals(taskId)) { + throw new BadRequestException( + StringUtils.format("Requested taskId[%s] doesn't match with taskId[%s]", requestTaskId, taskId) + ); } } @@ -61,6 +73,6 @@ public Object doTaskChat(@PathParam("id") String handlerId, @Context HttpHeaders return handler.get(); } - return null; + throw new BadRequestException(StringUtils.format("Can't find chatHandler for handler[%s]", handlerId)); } } diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java new file mode 100644 index 000000000000..8badcabe5604 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.initialization.jetty; + +public class BadRequestException extends RuntimeException +{ + public BadRequestException(String msg) + { + super(msg); + } +} diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestExceptionMapper.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestExceptionMapper.java new file mode 100644 index 000000000000..8a98a3d7cf89 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestExceptionMapper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.initialization.jetty; + +import com.google.common.collect.ImmutableMap; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +@Provider +public class BadRequestExceptionMapper implements ExceptionMapper +{ + @Override + public Response toResponse(BadRequestException exception) + { + return Response.status(Status.BAD_REQUEST) + .type(MediaType.APPLICATION_JSON) + .entity(ImmutableMap.of("error", exception.getMessage())) + .build(); + } +} diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java index 7bd1ffa70285..7c1ec8b56f76 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java @@ -112,6 +112,7 @@ protected void configureServlets() binder.bind(DruidGuiceContainer.class).in(Scopes.SINGLETON); binder.bind(CustomExceptionMapper.class).in(Singleton.class); binder.bind(ForbiddenExceptionMapper.class).in(Singleton.class); + binder.bind(BadRequestExceptionMapper.class).in(Singleton.class); serve("/*").with(DruidGuiceContainer.class); From c07cd9294fbb4fc6803fa9660f489b7924988941 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 20 Apr 2019 15:25:13 -0700 Subject: [PATCH 2/2] fix tests --- .../druid/indexing/kafka/KafkaIndexTaskClientTest.java | 7 ++++--- .../druid/indexing/kinesis/KinesisIndexTaskClientTest.java | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java index 28cc0bbe9d15..f0178b8a09a7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java @@ -184,9 +184,10 @@ public void testTaskNotRunnableException() public void testInternalServerError() { expectedException.expect(RuntimeException.class); - expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500]"); + expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500] and content []"); expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2); + expect(responseHolder.getContent()).andReturn(""); expect( httpClient.go( EasyMock.anyObject(Request.class), @@ -231,7 +232,7 @@ public void testTaskLocationMismatch() expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3) .andReturn(HttpResponseStatus.OK); expect(responseHolder.getResponse()).andReturn(response); - expect(responseHolder.getContent()).andReturn("") + expect(responseHolder.getContent()).andReturn("").times(2) .andReturn("{}"); expect(response.headers()).andReturn(headers); expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id"); @@ -291,7 +292,7 @@ public void testGetCurrentOffsetsWithRetry() throws Exception Capture captured = Capture.newInstance(CaptureType.ALL); expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6) .andReturn(HttpResponseStatus.OK).times(1); - expect(responseHolder.getContent()).andReturn("").times(2) + expect(responseHolder.getContent()).andReturn("").times(4) .andReturn("{\"0\":1, \"1\":10}"); expect(responseHolder.getResponse()).andReturn(response).times(2); expect(response.headers()).andReturn(headers).times(2); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java index 9d7fafbb4c73..9ac25f5de6ea 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java @@ -185,9 +185,10 @@ public void testTaskNotRunnableException() public void testInternalServerError() { expectedException.expect(RuntimeException.class); - expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500]"); + expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500] and content []"); expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2); + expect(responseHolder.getContent()).andReturn(""); expect( httpClient.go( EasyMock.anyObject(Request.class), @@ -232,7 +233,7 @@ public void testTaskLocationMismatch() expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3) .andReturn(HttpResponseStatus.OK); expect(responseHolder.getResponse()).andReturn(response); - expect(responseHolder.getContent()).andReturn("") + expect(responseHolder.getContent()).andReturn("").times(2) .andReturn("{}"); expect(response.headers()).andReturn(headers); expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id"); @@ -292,7 +293,7 @@ public void testGetCurrentOffsetsWithRetry() throws Exception Capture captured = Capture.newInstance(CaptureType.ALL); expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6) .andReturn(HttpResponseStatus.OK).times(1); - expect(responseHolder.getContent()).andReturn("").times(2) + expect(responseHolder.getContent()).andReturn("").times(4) .andReturn("{\"0\":1, \"1\":10}"); expect(responseHolder.getResponse()).andReturn(response).times(2); expect(response.headers()).andReturn(headers).times(2);