diff --git a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index 0dfa685d3bd7..598153ec778a 100644 --- a/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -58,7 +58,7 @@ public static void closeStreamsQuietly(S3Object s3Obj) public static T retryS3Operation(Callable f) throws IOException, S3ServiceException, InterruptedException { int nTry = 0; - final int maxTries = 3; + final int maxTries = 10; while (true) { try { nTry++; @@ -89,13 +89,10 @@ public static T retryS3Operation(Callable f) throws IOException, S3Servic private static void awaitNextRetry(Exception e, int nTry) throws InterruptedException { final long baseSleepMillis = 1000; - final double fuzziness = 0.2; - final long sleepMillis = Math.max( - baseSleepMillis, - (long) (baseSleepMillis * Math.pow(2, nTry) * - (1 + new Random().nextGaussian() * fuzziness)) - ); - log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis); + final long maxSleepMillis = 60000; + final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * new Random().nextGaussian(), 0), 2); + final long sleepMillis = (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry)) * fuzzyMultiplier); + log.warn("S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis); Thread.sleep(sleepMillis); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 74893edbf020..35abfcaf866a 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -98,6 +98,8 @@ public class RealtimePlumberSchool implements PlumberSchool private final IndexGranularity segmentGranularity; private final Object handoffCondition = new Object(); + private volatile boolean shuttingDown = false; + @JacksonInject @NotNull private volatile ServiceEmitter emitter; @@ -410,14 +412,16 @@ public void doRun() log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) .addData("interval", interval) .emit(); + if (shuttingDown) { + // We're trying to shut down, and this segment failed to push. Let's just get rid of it. + abandonSegment(truncatedTime, sink); + } } if (mergedFile != null) { try { - if (mergedFile != null) { - log.info("Deleting Index File[%s]", mergedFile); - FileUtils.deleteDirectory(mergedFile); - } + log.info("Deleting Index File[%s]", mergedFile); + FileUtils.deleteDirectory(mergedFile); } catch (IOException e) { log.warn(e, "Error deleting directory[%s]", mergedFile); @@ -433,6 +437,8 @@ public void finishJob() { log.info("Shutting down..."); + shuttingDown = true; + for (final Map.Entry entry : sinks.entrySet()) { persistAndMerge(entry.getKey(), entry.getValue()); } @@ -613,26 +619,7 @@ public ServerView.CallbackAction segmentAdded(DruidServer server, DataSegment se final String sinkVersion = sink.getSegment().getVersion(); if (segmentVersion.compareTo(sinkVersion) >= 0) { log.info("Segment version[%s] >= sink version[%s]", segmentVersion, sinkVersion); - try { - segmentAnnouncer.unannounceSegment(sink.getSegment()); - FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); - log.info("Removing sinkKey %d for segment %s", sinkKey, sink.getSegment().getIdentifier()); - sinks.remove(sinkKey); - sinkTimeline.remove( - sink.getInterval(), - sink.getVersion(), - new SingleElementPartitionChunk(sink) - ); - - synchronized (handoffCondition) { - handoffCondition.notifyAll(); - } - } - catch (IOException e) { - log.makeAlert(e, "Unable to delete old segment for dataSource[%s].", schema.getDataSource()) - .addData("interval", sink.getInterval()) - .emit(); - } + abandonSegment(sinkKey, sink); } } } @@ -706,6 +693,31 @@ public ScheduledExecutors.Signal doCall() } ); } + + /** + * Unannounces a given sink and removes all local references to it. + */ + private void abandonSegment(final long truncatedTime, final Sink sink) { + try { + segmentAnnouncer.unannounceSegment(sink.getSegment()); + FileUtils.deleteDirectory(computePersistDir(schema, sink.getInterval())); + log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); + sinks.remove(truncatedTime); + sinkTimeline.remove( + sink.getInterval(), + sink.getVersion(), + new SingleElementPartitionChunk<>(sink) + ); + synchronized (handoffCondition) { + handoffCondition.notifyAll(); + } + } + catch (IOException e) { + log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .emit(); + } + } }; }