diff --git a/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/service/stream/IntervalStreamHandler.kt b/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/service/stream/IntervalStreamHandler.kt index 485e58158..0a5e1a560 100644 --- a/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/service/stream/IntervalStreamHandler.kt +++ b/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/service/stream/IntervalStreamHandler.kt @@ -39,6 +39,7 @@ abstract class IntervalStreamHandler( val job = streamJobs[type] ?: return@runGovernor if (jobs[type] == null || jobs[type]?.isCancelled == true) { logger.info("job running for $type") + job.isRunAllowed = true jobs[type] = intervalExecutor.scheduleAtFixedRate( { job.run(type) }, 0, @@ -51,40 +52,65 @@ abstract class IntervalStreamHandler( private fun StreamJob.run(type: T) { runBlocking(AppDispatchers.websocketExecutor) { - val data = runnable() - template.convertAndSend(getPath(type), data) + if (isRunAllowed) { + val data = runnable() + template.convertAndSend(getPath(type), data) + } } } @Scheduled(fixedDelay = 60 * 1000) private fun govern() { runGovernor { - jobs.entries.forEach { j -> - val job = j.value - val count = userRegistry.findSubscriptions { it.destination == getPath(j.key) }.count() - if (count == 0) { - if (job?.isCancelled == false) { - logger.info("No subscriber for ${j.key}. task stopped") - job.cancel(false) - } - } else { - if (job == null || job.isCancelled) { - streamJobs[j.key]?.let { s -> - jobs[j.key] = intervalExecutor.scheduleAtFixedRate( - { s.run(j.key) }, - 0, - s.interval, - s.timeUnit - ) - } - logger.info("Starting job") + cancelOrphanJobs() + intervalExecutor.purge() + } + } + + private fun cancelOrphanJobs() { + jobs.entries.forEach { j -> + val job = j.value + val count = userRegistry.findSubscriptions { it.destination == getPath(j.key) }.count() + val sJob = streamJobs[j.key] + if (count == 0) { + if (sJob?.isRunAllowed == true){ + logger.info("No subscriber for ${j.key}. stopping task") + sJob.isRunAllowed = false + } + + if (job?.isCancelled == false) + job.cancel(false) + } else { + if (job == null || job.isCancelled || job.isDone) { + sJob?.let { s -> + s.isRunAllowed = true + jobs[j.key] = intervalExecutor.scheduleAtFixedRate( + { s.run(j.key) }, + 0, + s.interval, + s.timeUnit + ) } + logger.info("Starting job") } } } } - private fun runGovernor(runnable: () -> Unit) { + @Scheduled(initialDelay = 1000, fixedDelay = 60 * 1000) + private fun logStatus() { + var runningJob = 0 + jobs.entries.forEach { + if (it.value != null && (it.value?.isDone == false || it.value?.isCancelled == false)) { + runningJob++ + logger.info("+ running: ${it.key}") + } + } + + logger.info("== Total of $runningJob jobs running ==") + } + + private fun runGovernor(runnable: suspend () -> Unit) { runBlocking(governorExecutor) { runnable() } } diff --git a/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/service/stream/StreamJob.kt b/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/service/stream/StreamJob.kt index 7aa69eb0c..ef7dda236 100644 --- a/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/service/stream/StreamJob.kt +++ b/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/service/stream/StreamJob.kt @@ -5,5 +5,6 @@ import java.util.concurrent.TimeUnit data class StreamJob( val interval: Long, val timeUnit: TimeUnit, + var isRunAllowed: Boolean = true, val runnable: suspend () -> Any ) \ No newline at end of file diff --git a/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/socket/StompEventsConfig.kt b/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/socket/StompEventsConfig.kt index a820c92ac..bc4885ca7 100644 --- a/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/socket/StompEventsConfig.kt +++ b/websocket/websocket-app/src/main/kotlin/co/nilin/opex/websocket/app/socket/StompEventsConfig.kt @@ -1,5 +1,6 @@ package co.nilin.opex.websocket.app.socket +import org.slf4j.LoggerFactory import org.springframework.context.ApplicationListener import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -9,34 +10,36 @@ import org.springframework.web.socket.messaging.* @Configuration class StompEventsConfig { + private val logger = LoggerFactory.getLogger(StompEventsConfig::class.java) + @Bean fun brokerAvailabilityListener() = ApplicationListener { event -> - println("Is broker available: ${event.isBrokerAvailable}") + logger.info("Is broker available: ${event.isBrokerAvailable}") } @Bean fun sessionConnectListener() = ApplicationListener { event -> - println("* session connect received: ${event.message}") + logger.info("* session connect received: ${event.message}") } @Bean fun sessionConnectedListener() = ApplicationListener { event -> - println("* connected: ${event.message}") + logger.info("* connected: ${event.message}") } @Bean fun sessionDisconnectedListener() = ApplicationListener { event -> - println("* disconnected: ${event.message}") + logger.info("* disconnected: ${event.message}") } @Bean fun sessionSubscribeListener() = ApplicationListener { event -> - println("* subscribed: ${event.message}") + logger.info("+ subscribed: ${event.message}") } @Bean fun sessionUnsubscribeEventListener() = ApplicationListener { event -> - println("- unsubscribed: ${event.message}") + logger.info("- unsubscribed: ${event.message}") } } \ No newline at end of file