diff --git a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java index 15e03c06d867..ed4dcea4c36d 100644 --- a/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java +++ b/extensions-contrib/graphite-emitter/src/main/java/io/druid/emitter/graphite/GraphiteEmitter.java @@ -136,16 +136,14 @@ public void emit(Event event) private class ConsumerRunnable implements Runnable { - private PickledGraphite pickledGraphite = new PickledGraphite( - graphiteEmitterConfig.getHostname(), - graphiteEmitterConfig.getPort(), - graphiteEmitterConfig.getBatchSize() - ); - @Override public void run() { - try { + try (PickledGraphite pickledGraphite = new PickledGraphite( + graphiteEmitterConfig.getHostname(), + graphiteEmitterConfig.getPort(), + graphiteEmitterConfig.getBatchSize() + )) { if (!pickledGraphite.isConnected()) { log.info("trying to connect to graphite server"); pickledGraphite.connect(); @@ -174,12 +172,16 @@ public void run() log.error(e, e.getMessage()); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); + break; } else if (e instanceof SocketException) { + // This is antagonistic to general Closeable contract in Java, + // it is needed to allow re-connection in case of the socket is closed due long period of inactivity + pickledGraphite.close(); + log.warn("Trying to re-connect to graphite server"); pickledGraphite.connect(); } } } - pickledGraphite.flush(); } catch (Exception e) { log.error(e, e.getMessage());