From 53deaac74e231a5f170d3e35bdee376fa6c95e25 Mon Sep 17 00:00:00 2001 From: Artiom Darie Date: Wed, 31 Jul 2019 10:05:11 +0300 Subject: [PATCH 1/4] Issue 8206: Fixed class cast exception in case of batch recovery --- .../util/emitter/core/HttpPostEmitter.java | 6 +- .../emitter/core/HttpPostEmitterTest.java | 79 +++++++++++++++++++ 2 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java index 85ad787bcdcf..88e6c85eedb4 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java @@ -251,8 +251,8 @@ Batch emitAndReturnBatch(Event event) while (true) { Object batchObj = concurrentBatch.get(); - if (batchObj instanceof Integer) { - tryRecoverCurrentBatch((Integer) batchObj); + if (batchObj instanceof Long) { + tryRecoverCurrentBatch((Long) batchObj); continue; } if (batchObj == null) { @@ -342,7 +342,7 @@ private void doOnSealExclusive(Batch batch, long elapsedTimeMillis) } } - private void tryRecoverCurrentBatch(Integer failedBatchNumber) + private void tryRecoverCurrentBatch(Long failedBatchNumber) { log.info("Trying to recover currentBatch"); long nextBatchNumber = ConcurrentAwaitableCounter.nextCount(failedBatchNumber); diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java new file mode 100644 index 000000000000..c2259ecc3128 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java @@ -0,0 +1,79 @@ +package org.apache.druid.java.util.emitter.core; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.primitives.Ints; +import org.asynchttpclient.ListenableFuture; +import org.asynchttpclient.Request; +import org.asynchttpclient.Response; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Test {@link HttpPostEmitter} class. + */ +public class HttpPostEmitterTest +{ + + private static final ObjectMapper objectMapper = new ObjectMapper() + { + @Override + public byte[] writeValueAsBytes(Object value) + { + return Ints.toByteArray(((IntEvent) value).index); + } + }; + + private final MockHttpClient httpClient = new MockHttpClient(); + + @Before + public void setup() + { + httpClient.setGoHandler(new GoHandler() + { + @Override + protected ListenableFuture go(Request request) + { + return GoHandlers.immediateFuture(EmitterTest.okResponse()); + } + }); + } + + + @Test(expected = ClassCastException.class) + @SuppressWarnings("unchecked") + public void testRecoveryEmitAndReturnBatch() + throws InterruptedException, IOException, NoSuchFieldException, IllegalAccessException + { + HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar") + .setFlushMillis(100) + .setFlushCount(4) + .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS) + .setMaxBatchSize(1024 * 1024) + .setBatchQueueSizeLimit(1000) + .build(); + final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper); + emitter.start(); + + // emit first event + emitter.emitAndReturnBatch(new IntEvent()); + Thread.sleep(1000L); + + // get concurrentBatch reference and set value to lon as if it would fail while + // HttpPostEmitter#onSealExclusive method invocation. + Field concurrentBatch = emitter.getClass().getDeclaredField("concurrentBatch"); + concurrentBatch.setAccessible(true); + ((AtomicReference) concurrentBatch.get(emitter)).getAndSet(1L); + // something terrible happened previously so that batch has to recover + + // emit second event + emitter.emitAndReturnBatch(new IntEvent()); + + emitter.flush(); + emitter.close(); + } + +} From aa1d5c1e003d44cf100345ca1999eb9337d35779 Mon Sep 17 00:00:00 2001 From: Artiom Darie Date: Wed, 31 Jul 2019 15:18:47 +0300 Subject: [PATCH 2/4] Issue 8206: Added HttpPostEmitterTest license header --- .../emitter/core/HttpPostEmitterTest.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java index c2259ecc3128..8d3c50d72b1c 100644 --- a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java +++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.java.util.emitter.core; import com.fasterxml.jackson.databind.ObjectMapper; @@ -12,9 +31,6 @@ import java.lang.reflect.Field; import java.util.concurrent.atomic.AtomicReference; -/** - * Test {@link HttpPostEmitter} class. - */ public class HttpPostEmitterTest { From 8d840147cb5ae1384bbbe9f2bc2159067258a500 Mon Sep 17 00:00:00 2001 From: artyomyus Date: Wed, 31 Jul 2019 22:34:44 +0300 Subject: [PATCH 3/4] Issue 8206: Updated comments accordingly to code review. --- .../apache/druid/java/util/emitter/core/Batch.java | 3 +++ .../java/util/emitter/core/HttpPostEmitter.java | 12 ++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java index 5bc598a724de..fbcfb23f0f82 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/Batch.java @@ -92,6 +92,9 @@ private static boolean isEmittingAllowed(long state) * Ordering number of this batch, as they filled & emitted in {@link HttpPostEmitter} serially, starting from 0. * It's a boxed Long rather than primitive long, because we want to minimize the number of allocations done in * {@link HttpPostEmitter#onSealExclusive} and so the probability of {@link OutOfMemoryError}. + * + * See {@link HttpPostEmitter#concurrentBatch} which may store this object. + * * @see HttpPostEmitter#onSealExclusive * @see HttpPostEmitter#concurrentBatch */ diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java index 88e6c85eedb4..1036fb965329 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/HttpPostEmitter.java @@ -109,10 +109,10 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter private final AtomicInteger approximateBuffersToReuseCount = new AtomicInteger(); /** - * concurrentBatch.get() == null means the service is closed. concurrentBatch.get() is the instance of Integer, - * it means that some thread has failed with a serious error during {@link #onSealExclusive} (with the batch number - * corresponding to the Integer object) and {@link #tryRecoverCurrentBatch} needs to be called. Otherwise (i. e. - * normally), an instance of {@link Batch} is stored in this atomic reference. + * concurrentBatch.get() == null means the service is closed. concurrentBatch.get() is the instance of Long (i. e. the + * type of {@link Batch#batchNumber}), it means that some thread has failed with a serious error during {@link + * #onSealExclusive} (with the batch number corresponding to the Long object) and {@link #tryRecoverCurrentBatch} + * needs to be called. Otherwise (i. e. normally), an instance of {@link Batch} is stored in this atomic reference. */ private final AtomicReference concurrentBatch = new AtomicReference<>(); @@ -535,8 +535,8 @@ private boolean needsToShutdown() if (batch instanceof Batch) { ((Batch) batch).sealIfFlushNeeded(); } else { - // batch == null means that HttpPostEmitter is terminated. Batch object could also be Integer, if some - // thread just failed with a serious error in onSealExclusive(), in this case we don't want to shutdown + // batch == null means that HttpPostEmitter is terminated. Batch object might also be a Long object if some + // thread just failed with a serious error in onSealExclusive(). In this case we don't want to shutdown // the emitter thread. needsToShutdown = batch == null; } From a049e4c3e7eddf495e7ecd7998942759457e46c9 Mon Sep 17 00:00:00 2001 From: artyomyus Date: Wed, 31 Jul 2019 22:45:49 +0300 Subject: [PATCH 4/4] Issue 8206: Updated HttpPostEmitterTest accordingly to new modifications. --- .../druid/java/util/emitter/core/HttpPostEmitterTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java index 8d3c50d72b1c..a1a6a3f732f1 100644 --- a/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java +++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java @@ -24,6 +24,7 @@ import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.Request; import org.asynchttpclient.Response; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -59,7 +60,7 @@ protected ListenableFuture go(Request request) } - @Test(expected = ClassCastException.class) + @Test @SuppressWarnings("unchecked") public void testRecoveryEmitAndReturnBatch() throws InterruptedException, IOException, NoSuchFieldException, IllegalAccessException @@ -90,6 +91,8 @@ public void testRecoveryEmitAndReturnBatch() emitter.flush(); emitter.close(); + + Assert.assertEquals(2, emitter.getTotalEmittedEvents()); } }