diff --git a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt index eb89d875a..096fac5d9 100644 --- a/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt +++ b/accountant/accountant-ports/accountant-eventlistener-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/listener/config/AccountantKafkaConfig.kt @@ -1,13 +1,12 @@ package co.nilin.opex.accountant.ports.kafka.listener.config - import co.nilin.opex.accountant.ports.kafka.listener.consumer.EventKafkaListener import co.nilin.opex.accountant.ports.kafka.listener.consumer.OrderKafkaListener import co.nilin.opex.accountant.ports.kafka.listener.consumer.TempEventKafkaListener import co.nilin.opex.accountant.ports.kafka.listener.consumer.TradeKafkaListener import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier @@ -15,25 +14,25 @@ import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.context.support.GenericApplicationContext import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer -import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.* import org.springframework.kafka.support.serializer.JsonDeserializer +import org.springframework.util.backoff.FixedBackOff import java.util.regex.Pattern @Configuration class AccountantKafkaConfig { @Value("\${spring.kafka.bootstrap-servers}") - private val bootstrapServers: String? = null + private lateinit var bootstrapServers: String @Value("\${spring.kafka.consumer.group-id}") - private val groupId: String? = null + private lateinit var groupId: String - @Bean("accountantConsumerConfig") - fun consumerConfigs(): Map { + @Bean("consumerConfig") + fun consumerConfigs(): Map { return mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG to groupId, @@ -45,7 +44,7 @@ class AccountantKafkaConfig { } @Bean("accountantConsumerFactory") - fun consumerFactory(@Qualifier("accountantConsumerConfig") consumerConfigs: Map): ConsumerFactory { + fun consumerFactory(@Qualifier("consumerConfig") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } @@ -53,12 +52,14 @@ class AccountantKafkaConfig { @ConditionalOnBean(TradeKafkaListener::class) fun configureTradeListener( tradeListener: TradeKafkaListener, + @Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate, @Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("trades_.*")) containerProps.messageListener = tradeListener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("TradeKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "trades.DLT") container.start() } @@ -66,12 +67,14 @@ class AccountantKafkaConfig { @ConditionalOnBean(EventKafkaListener::class) fun configureEventListener( eventListener: EventKafkaListener, + @Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate, @Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("events_.*")) containerProps.messageListener = eventListener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("EventKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "events.DLT") container.start() } @@ -79,32 +82,38 @@ class AccountantKafkaConfig { @ConditionalOnBean(OrderKafkaListener::class) fun configureOrderListener( orderListener: OrderKafkaListener, + @Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate, @Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("orders_.*")) containerProps.messageListener = orderListener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("OrderKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "orders.DLT") container.start() } - @Autowired - fun createTempTopics(applicationContext: GenericApplicationContext) { - applicationContext.registerBean("topic_tempevents", NewTopic::class.java, "tempevents", 1, 1) - } - @Autowired @ConditionalOnBean(TempEventKafkaListener::class) fun configureTempEventListener( eventListener: TempEventKafkaListener, + @Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate, @Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("tempevents")) containerProps.messageListener = eventListener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("TempEventKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "tempevents.DLT") container.start() } + private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler { + val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ -> + cr.headers().add("dlt-origin-module", "ACCOUNTANT".toByteArray()) + TopicPartition(dltTopic, cr.partition()) + } + return DefaultErrorHandler(recoverer, FixedBackOff(5_000, 20)) + } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt new file mode 100644 index 000000000..2ddb2fe1e --- /dev/null +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/KafkaTopicConfig.kt @@ -0,0 +1,37 @@ +package co.nilin.opex.accountant.ports.kafka.submitter.config + +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.common.config.TopicConfig +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Configuration +import org.springframework.context.support.GenericApplicationContext +import org.springframework.kafka.config.TopicBuilder +import java.util.function.Supplier + +@Configuration +class KafkaTopicConfig { + + @Autowired + fun createTopics(applicationContext: GenericApplicationContext) { + with(applicationContext) { + registerBean("topic_richOrder", NewTopic::class.java, Supplier { + TopicBuilder.name("richOrder") + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + + registerBean("topic_richTrade", NewTopic::class.java, Supplier { + TopicBuilder.name("richTrade") + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + + registerBean("topic_tempevents", NewTopic::class.java, "tempevents", 1, 1) + } + } + +} \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt index f0b2bdfc0..0de624e27 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/config/SubmitterKafkaConfig.kt @@ -3,22 +3,16 @@ package co.nilin.opex.accountant.ports.kafka.submitter.config import co.nilin.opex.accountant.core.inout.RichOrderEvent import co.nilin.opex.accountant.core.inout.RichTrade import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.serialization.StringSerializer -import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.context.support.GenericApplicationContext -import org.springframework.kafka.config.TopicBuilder import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory import org.springframework.kafka.support.serializer.JsonSerializer -import java.util.function.Supplier @Configuration class SubmitterKafkaConfig { @@ -26,7 +20,7 @@ class SubmitterKafkaConfig { @Value("\${spring.kafka.bootstrap-servers}") private lateinit var bootstrapServers: String - @Bean("accountantProducerConfigs") + @Bean("producerConfigs") fun producerConfigs(): Map { return mapOf( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, @@ -38,7 +32,7 @@ class SubmitterKafkaConfig { } @Bean("accountantEventProducerFactory") - fun producerFactory(@Qualifier("accountantProducerConfigs") producerConfigs: Map): ProducerFactory { + fun producerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { return DefaultKafkaProducerFactory(producerConfigs) } @@ -48,7 +42,7 @@ class SubmitterKafkaConfig { } @Bean("richTradeProducerFactory") - fun richTradeProducerFactory(@Qualifier("accountantProducerConfigs") producerConfigs: Map): ProducerFactory { + fun richTradeProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { return DefaultKafkaProducerFactory(producerConfigs) } @@ -58,7 +52,7 @@ class SubmitterKafkaConfig { } @Bean("richOrderProducerFactory") - fun richOrderProducerFactory(@Qualifier("accountantProducerConfigs") producerConfigs: Map): ProducerFactory { + fun richOrderProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { return DefaultKafkaProducerFactory(producerConfigs) } @@ -66,23 +60,4 @@ class SubmitterKafkaConfig { fun richOrderKafkaTemplate(@Qualifier("richOrderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { return KafkaTemplate(producerFactory) } - - @Autowired - fun createTopics(applicationContext: GenericApplicationContext) { - applicationContext.registerBean("topic_richOrder", NewTopic::class.java, Supplier { - TopicBuilder.name("richOrder") - .partitions(10) - .replicas(3) - .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") - .build() - }) - - applicationContext.registerBean("topic_richTrade", NewTopic::class.java, Supplier { - TopicBuilder.name("richTrade") - .partitions(10) - .replicas(3) - .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") - .build() - }) - } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichOrderSubmitter.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichOrderSubmitter.kt index b8893c862..05c2e7189 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichOrderSubmitter.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichOrderSubmitter.kt @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.submitter.service import co.nilin.opex.accountant.core.inout.RichOrderEvent import co.nilin.opex.accountant.core.spi.RichOrderPublisher +import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component @@ -13,13 +14,17 @@ import kotlin.coroutines.suspendCoroutine class RichOrderSubmitter(@Qualifier("richOrderKafkaTemplate") val kafkaTemplate: KafkaTemplate) : RichOrderPublisher { + private val logger = LoggerFactory.getLogger(RichOrderSubmitter::class.java) + override suspend fun publish(order: RichOrderEvent): Unit = suspendCoroutine { cont -> - println("richOrderSubmit!") + logger.info("Submitting RichOrder") + val sendFuture = kafkaTemplate.send("richOrder", order) - sendFuture.addCallback({ sendResult -> + sendFuture.addCallback({ cont.resume(Unit) - }, { exception -> - cont.resumeWithException(exception) + }, { + logger.info("Error submitting RichOrder", it) + cont.resumeWithException(it) }) } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichTradeSubmitter.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichTradeSubmitter.kt index 31ed37b7f..e35188ada 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichTradeSubmitter.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/RichTradeSubmitter.kt @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.submitter.service import co.nilin.opex.accountant.core.inout.RichTrade import co.nilin.opex.accountant.core.spi.RichTradePublisher +import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component @@ -12,13 +13,18 @@ import kotlin.coroutines.suspendCoroutine @Component class RichTradeSubmitter(@Qualifier("richTradeKafkaTemplate") val kafkaTemplate: KafkaTemplate) : RichTradePublisher { + + private val logger = LoggerFactory.getLogger(RichTradeSubmitter::class.java) + override suspend fun publish(trade: RichTrade): Unit = suspendCoroutine { cont -> - println("richTradeSubmit!") + logger.info("Submitting RichTrade event: id=${trade.id}") + val sendFuture = kafkaTemplate.send("richTrade", trade) - sendFuture.addCallback({ sendResult -> + sendFuture.addCallback({ cont.resume(Unit) - }, { exception -> - cont.resumeWithException(exception) + }, { + logger.error("RichTrade submitter error", it) + cont.resumeWithException(it) }) } } \ No newline at end of file diff --git a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/TempEventSubmitter.kt b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/TempEventSubmitter.kt index 2fa563725..09c184248 100644 --- a/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/TempEventSubmitter.kt +++ b/accountant/accountant-ports/accountant-submitter-kafka/src/main/kotlin/co/nilin/opex/accountant/ports/kafka/submitter/service/TempEventSubmitter.kt @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.submitter.service import co.nilin.opex.accountant.core.spi.TempEventRepublisher import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent +import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component @@ -12,14 +13,19 @@ import kotlin.coroutines.suspendCoroutine @Component class TempEventSubmitter(@Qualifier("accountantEventKafkaTemplate") val kafkaTemplate: KafkaTemplate) : TempEventRepublisher { + + private val logger = LoggerFactory.getLogger(TempEventSubmitter::class.java) + override suspend fun republish(events: List): Unit = suspendCoroutine { cont -> - println("accountantEventSubmit!") + logger.info("Submitting TempEvents") + events.forEach { event -> val sendFuture = kafkaTemplate.send("tempevents", event) - sendFuture.addCallback({ sendResult -> + sendFuture.addCallback({ cont.resume(Unit) - }, { exception -> - cont.resumeWithException(exception) + }, { + logger.error("Error submitting TempEvents", it) + cont.resumeWithException(it) }) } } diff --git a/admin/admin-ports/admin-submitter-kafka/src/main/kotlin/co/nilin/opex/admin/ports/kafka/submitter/service/AdminKafkaEventPublisher.kt b/admin/admin-ports/admin-submitter-kafka/src/main/kotlin/co/nilin/opex/admin/ports/kafka/submitter/service/AdminKafkaEventPublisher.kt index d27a0c57a..2a99dd4dd 100644 --- a/admin/admin-ports/admin-submitter-kafka/src/main/kotlin/co/nilin/opex/admin/ports/kafka/submitter/service/AdminKafkaEventPublisher.kt +++ b/admin/admin-ports/admin-submitter-kafka/src/main/kotlin/co/nilin/opex/admin/ports/kafka/submitter/service/AdminKafkaEventPublisher.kt @@ -16,8 +16,14 @@ class AdminKafkaEventPublisher(private val kafkaTemplate: KafkaTemplate logger.info("Publishing admin event: $event") + val sendFuture = kafkaTemplate.send("admin_event", event) - sendFuture.addCallback({ cont.resume(Unit) }, { cont.resumeWithException(it) }) + sendFuture.addCallback({ + cont.resume(Unit) + }, { + logger.error("Error publishing admin event", it) + cont.resumeWithException(it) + }) } } \ No newline at end of file diff --git a/api/api-ports/api-eventlistener-kafka/src/main/kotlin/co/nilin/opex/api/ports/kafka/listener/config/ApiKafkaConfig.kt b/api/api-ports/api-eventlistener-kafka/src/main/kotlin/co/nilin/opex/api/ports/kafka/listener/config/ApiKafkaConfig.kt index 30b6b1f05..0af4a06ce 100644 --- a/api/api-ports/api-eventlistener-kafka/src/main/kotlin/co/nilin/opex/api/ports/kafka/listener/config/ApiKafkaConfig.kt +++ b/api/api-ports/api-eventlistener-kafka/src/main/kotlin/co/nilin/opex/api/ports/kafka/listener/config/ApiKafkaConfig.kt @@ -1,42 +1,39 @@ package co.nilin.opex.api.ports.kafka.listener.config +import co.nilin.opex.accountant.core.inout.RichOrderEvent +import co.nilin.opex.accountant.core.inout.RichTrade import co.nilin.opex.api.ports.kafka.listener.consumer.EventKafkaListener import co.nilin.opex.api.ports.kafka.listener.consumer.OrderKafkaListener import co.nilin.opex.api.ports.kafka.listener.consumer.TradeKafkaListener import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.context.support.GenericApplicationContext -import org.springframework.kafka.config.TopicBuilder -import org.springframework.kafka.core.* -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer -import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.* import org.springframework.kafka.support.serializer.JsonDeserializer -import org.springframework.kafka.support.serializer.JsonSerializer -import java.util.function.Supplier +import org.springframework.util.backoff.FixedBackOff import java.util.regex.Pattern @Configuration class ApiKafkaConfig { @Value("\${spring.kafka.bootstrap-servers}") - private val bootstrapServers: String? = null + private lateinit var bootstrapServers: String @Value("\${spring.kafka.consumer.group-id}") - private val groupId: String? = null + private lateinit var groupId: String @Bean("apiConsumerConfig") - fun consumerConfigs(): Map { + fun consumerConfigs(): Map { return mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG to groupId, @@ -46,41 +43,33 @@ class ApiKafkaConfig { ) } - @Bean("apiConsumerFactory") - fun consumerFactory(@Qualifier("apiConsumerConfig") consumerConfigs: Map): ConsumerFactory { + @Bean("eventsConsumerFactory") + fun consumerFactory(@Qualifier("apiConsumerConfig") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } - @Bean("apiProducerConfig") - fun producerConfigs(): Map { - return mapOf( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, - ProducerConfig.ACKS_CONFIG to "all" - ) - } - - @Bean("apiProducerFactory") - fun producerFactory(@Qualifier("apiProducerConfig") producerConfigs: Map): ProducerFactory { - return DefaultKafkaProducerFactory(producerConfigs) + @Bean("richTradeConsumerFactory") + fun richTradeConsumerFactory(@Qualifier("apiConsumerConfig") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) } - @Bean("apiKafkaTemplate") - fun kafkaTemplate(@Qualifier("apiProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { - return KafkaTemplate(producerFactory) + @Bean("richOrderConsumerFactory") + fun richOrderConsumerFactory(@Qualifier("apiConsumerConfig") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) } @Autowired @ConditionalOnBean(TradeKafkaListener::class) fun configureTradeListener( tradeListener: TradeKafkaListener, - @Qualifier("apiConsumerFactory") consumerFactory: ConsumerFactory + template: KafkaTemplate, + @Qualifier("richTradeConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("richTrade")) containerProps.messageListener = tradeListener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("ApiTradeKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "richTrade.DLT") container.start() } @@ -88,12 +77,14 @@ class ApiKafkaConfig { @ConditionalOnBean(EventKafkaListener::class) fun configureEventListener( eventListener: EventKafkaListener, - @Qualifier("apiConsumerFactory") consumerFactory: ConsumerFactory + template: KafkaTemplate, + @Qualifier("eventsConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("events_.*")) containerProps.messageListener = eventListener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("ApiEventKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "events.DLT") container.start() } @@ -101,33 +92,23 @@ class ApiKafkaConfig { @ConditionalOnBean(OrderKafkaListener::class) fun configureOrderListener( orderListener: OrderKafkaListener, - @Qualifier("apiConsumerFactory") consumerFactory: ConsumerFactory + template: KafkaTemplate, + @Qualifier("richOrderConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("richOrder")) containerProps.messageListener = orderListener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("ApiOrderKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "richOrder.DLT") container.start() } - @Autowired - fun createTopics(applicationContext: GenericApplicationContext) { - applicationContext.registerBean("topic_richOrder", NewTopic::class.java, Supplier { - TopicBuilder.name("richOrder") - .partitions(10) - .replicas(3) - .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") - .build() - }) - - applicationContext.registerBean("topic_richTrade", NewTopic::class.java, Supplier { - TopicBuilder.name("richTrade") - .partitions(10) - .replicas(3) - .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") - .build() - }) + private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler { + val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ -> + cr.headers().add("dlt-origin-module", "API".toByteArray()) + TopicPartition(dltTopic, cr.partition()) + } + return DefaultErrorHandler(recoverer, FixedBackOff(5_000, 20)) } - } \ No newline at end of file diff --git a/api/api-ports/api-eventlistener-kafka/src/main/kotlin/co/nilin/opex/api/ports/kafka/listener/config/KafkaProducerConfig.kt b/api/api-ports/api-eventlistener-kafka/src/main/kotlin/co/nilin/opex/api/ports/kafka/listener/config/KafkaProducerConfig.kt new file mode 100644 index 000000000..d44bbd266 --- /dev/null +++ b/api/api-ports/api-eventlistener-kafka/src/main/kotlin/co/nilin/opex/api/ports/kafka/listener/config/KafkaProducerConfig.kt @@ -0,0 +1,63 @@ +package co.nilin.opex.api.ports.kafka.listener.config + +import co.nilin.opex.accountant.core.inout.RichOrderEvent +import co.nilin.opex.accountant.core.inout.RichTrade +import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonSerializer + +@Configuration +class KafkaProducerConfig { + + @Value("\${spring.kafka.bootstrap-servers}") + private lateinit var bootstrapServers: String + + @Bean("apiProducerConfigs") + fun producerConfigs(): Map { + return mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all" + ) + } + + @Bean("eventsProducerFactory") + fun producerFactory(@Qualifier("apiProducerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("eventKafkaTemplate") + fun kafkaTemplate(@Qualifier("eventsProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(producerFactory) + } + + @Bean("richTradeProducerFactory") + fun richTradeProducerFactory(@Qualifier("apiProducerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("richTradeKafkaTemplate") + fun richTradeTemplate(@Qualifier("richTradeProducerFactory") factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + + @Bean("richOrderProducerFactory") + fun richOrderProducerFactory(@Qualifier("apiProducerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("richOrderKafkaTemplate") + fun richOrderTemplate(@Qualifier("richOrderProducerFactory") factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + +} \ No newline at end of file diff --git a/api/api-ports/api-eventlistener-kafka/src/main/kotlin/co/nilin/opex/api/ports/kafka/listener/config/KafkaTopicConfig.kt b/api/api-ports/api-eventlistener-kafka/src/main/kotlin/co/nilin/opex/api/ports/kafka/listener/config/KafkaTopicConfig.kt new file mode 100644 index 000000000..aa5b62302 --- /dev/null +++ b/api/api-ports/api-eventlistener-kafka/src/main/kotlin/co/nilin/opex/api/ports/kafka/listener/config/KafkaTopicConfig.kt @@ -0,0 +1,33 @@ +package co.nilin.opex.api.ports.kafka.listener.config + +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.common.config.TopicConfig +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Configuration +import org.springframework.context.support.GenericApplicationContext +import org.springframework.kafka.config.TopicBuilder +import java.util.function.Supplier + +@Configuration +class KafkaTopicConfig { + + @Autowired + fun createTopics(applicationContext: GenericApplicationContext) { + applicationContext.registerBean("topic_richOrder", NewTopic::class.java, Supplier { + TopicBuilder.name("richOrder") + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + + applicationContext.registerBean("topic_richTrade", NewTopic::class.java, Supplier { + TopicBuilder.name("richTrade") + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + } + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/config/KafkaConfig.kt b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/config/KafkaConfig.kt index 19d44e68d..508af36d8 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/config/KafkaConfig.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/config/KafkaConfig.kt @@ -3,17 +3,20 @@ package co.nilin.opex.bcgateway.ports.kafka.listener.config import co.nilin.opex.bcgateway.ports.kafka.listener.consumer.AdminEventKafkaListener import co.nilin.opex.bcgateway.ports.kafka.listener.model.AdminEvent import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.DefaultKafkaConsumerFactory -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer -import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.* import org.springframework.kafka.support.serializer.JsonDeserializer +import org.springframework.util.backoff.FixedBackOff import java.util.regex.Pattern @Configuration @@ -25,7 +28,7 @@ class KafkaConfig { @Value("\${spring.kafka.consumer.group-id}") private val groupId: String? = null - @Bean + @Bean("consumerConfigs") fun consumerConfigs(): Map { return mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, @@ -38,7 +41,7 @@ class KafkaConfig { } @Bean - fun adminEventsConsumerFactory(consumerConfigs: Map): ConsumerFactory { + fun adminEventsConsumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } @@ -46,13 +49,23 @@ class KafkaConfig { @ConditionalOnBean(AdminEventKafkaListener::class) fun configureAdminEventListener( listener: AdminEventKafkaListener, + template: KafkaTemplate, consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("admin_event")) containerProps.messageListener = listener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("AdminEventKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "admin_event.DLT") container.start() } + private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler { + val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ -> + cr.headers().add("dlt-origin-module", "BC_GATEWAY".toByteArray()) + TopicPartition(dltTopic, cr.partition()) + } + return DefaultErrorHandler(recoverer, FixedBackOff(5_000, 20)) + } + } \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/config/KafkaProducerConfig.kt b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/config/KafkaProducerConfig.kt new file mode 100644 index 000000000..aaa7407c6 --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/config/KafkaProducerConfig.kt @@ -0,0 +1,42 @@ +package co.nilin.opex.bcgateway.ports.kafka.listener.config + +import co.nilin.opex.bcgateway.ports.kafka.listener.model.AdminEvent +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonSerializer + +@Configuration +class KafkaProducerConfig { + + @Value("\${spring.kafka.bootstrap-servers}") + private lateinit var bootstrapServers: String + + @Bean("producerConfigs") + fun producerConfigs(): Map { + return mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all", + JsonSerializer.TYPE_MAPPINGS to "admin_add_currency:co.nilin.opex.admin.core.events.AddCurrencyEvent,admin_edit_currency:co.nilin.opex.admin.core.events.EditCurrencyEvent,admin_delete_currency:co.nilin.opex.admin.core.events.DeleteCurrencyEvent" + ) + } + + @Bean + fun producerFactory(@Qualifier("producerConfigs") config: Map): ProducerFactory { + return DefaultKafkaProducerFactory(config) + } + + @Bean + fun kafkaTemplate(factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + +} \ No newline at end of file diff --git a/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/config/AppConfig.kt b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/config/AppConfig.kt index a786f65fd..a97121c9a 100644 --- a/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/config/AppConfig.kt +++ b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/config/AppConfig.kt @@ -1,19 +1,18 @@ package co.nilin.opex.eventlog.app.config +import co.nilin.opex.eventlog.app.listeners.DeadLetterListener +import co.nilin.opex.eventlog.app.listeners.EventlogEventListener +import co.nilin.opex.eventlog.app.listeners.EventlogTradeListener +import co.nilin.opex.eventlog.app.listeners.OrderListener +import co.nilin.opex.eventlog.core.spi.DeadLetterPersister import co.nilin.opex.eventlog.core.spi.EventPersister import co.nilin.opex.eventlog.core.spi.OrderPersister import co.nilin.opex.eventlog.core.spi.TradePersister +import co.nilin.opex.eventlog.ports.kafka.listener.consumer.DLTKafkaListener import co.nilin.opex.eventlog.ports.kafka.listener.consumer.EventKafkaListener import co.nilin.opex.eventlog.ports.kafka.listener.consumer.OrderKafkaListener import co.nilin.opex.eventlog.ports.kafka.listener.consumer.TradeKafkaListener -import co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderSubmitRequest -import co.nilin.opex.eventlog.ports.kafka.listener.spi.EventListener -import co.nilin.opex.eventlog.ports.kafka.listener.spi.OrderSubmitRequestListener -import co.nilin.opex.eventlog.ports.kafka.listener.spi.TradeListener -import co.nilin.opex.matching.engine.core.eventh.events.* import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.runBlocking -import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -33,9 +32,12 @@ class AppConfig { } @Bean - fun eventlogEventListener( - orderPersister: OrderPersister, eventPersister: EventPersister - ): EventlogEventListener { + fun deadLetterListener(persister: DeadLetterPersister): DeadLetterListener { + return DeadLetterListener(persister) + } + + @Bean + fun eventlogEventListener(orderPersister: OrderPersister, eventPersister: EventPersister): EventlogEventListener { return EventlogEventListener(orderPersister, eventPersister) } @@ -59,69 +61,9 @@ class AppConfig { eventKafkaListener.addEventListener(eventlogEventListener) } - class OrderListener(val orderPersister: OrderPersister) : OrderSubmitRequestListener { - - override fun id(): String { - return "OrderListener" - } - - override suspend fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) { - orderPersister.submitOrder( - SubmitOrderEvent( - order.ouid, - order.uuid, - order.orderId, - order.pair, - order.price, - order.quantity, - 0, - order.direction, - order.matchConstraint, - order.orderType - ) - ) - } - } - - class EventlogTradeListener(val tradePersister: TradePersister) : TradeListener { - - private val log = LoggerFactory.getLogger(EventlogTradeListener::class.java) - - override fun id(): String { - return "TradeListener" - } - - override fun onTrade(tradeEvent: TradeEvent, partition: Int, offset: Long, timestamp: Long) { - log.debug("Receive TradeEvent {}", tradeEvent) - runBlocking { - tradePersister.saveTrade(tradeEvent) - } - } + @Autowired + fun configureDeadLetterListener(dltKafkaListener: DLTKafkaListener, deadLetterListener: DeadLetterListener) { + dltKafkaListener.addEventListener(deadLetterListener) } - class EventlogEventListener( - val orderPersister: OrderPersister, val eventPersister: EventPersister - ) : EventListener { - - private val log = LoggerFactory.getLogger(EventlogEventListener::class.java) - - override fun id(): String { - return "EventListener" - } - - override fun onEvent(coreEvent: CoreEvent, partition: Int, offset: Long, timestamp: Long) { - log.debug("Receive CoreEvent {}", coreEvent) - runBlocking { - if (coreEvent is CreateOrderEvent) - orderPersister.saveOrder(coreEvent) - else if (coreEvent is RejectOrderEvent) - orderPersister.rejectOrder(coreEvent) - else if (coreEvent is UpdatedOrderEvent) - orderPersister.updateOrder(coreEvent) - else if (coreEvent is CancelOrderEvent) - orderPersister.cancelOrder(coreEvent) - eventPersister.saveEvent(coreEvent) - } - } - } } \ No newline at end of file diff --git a/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/DeadLetterListener.kt b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/DeadLetterListener.kt new file mode 100644 index 000000000..0a9ce8f5b --- /dev/null +++ b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/DeadLetterListener.kt @@ -0,0 +1,46 @@ +package co.nilin.opex.eventlog.app.listeners + +import co.nilin.opex.eventlog.core.inout.DeadLetterEvent +import co.nilin.opex.eventlog.core.spi.DeadLetterPersister +import co.nilin.opex.eventlog.ports.kafka.listener.spi.DLTListener +import kotlinx.coroutines.runBlocking +import org.apache.kafka.common.header.Headers +import org.slf4j.LoggerFactory +import org.springframework.kafka.support.KafkaHeaders +import java.time.Instant +import java.time.LocalDateTime +import java.util.* + +class DeadLetterListener(private val persister: DeadLetterPersister) : DLTListener { + + private val logger = LoggerFactory.getLogger(DeadLetterListener::class.java) + + override fun id(): String { + return "EventLogDeadLetterListener" + } + + override fun onEvent(event: String?, partition: Int, offset: Long, timestamp: Long, headers: Headers) = runBlocking { + logger.info("Dead letter event received: $event") + val map = hashMapOf().apply { + headers.forEach { + put(it.key(), it.value().toString(Charsets.UTF_8)) + } + } + + val dlt = DeadLetterEvent( + map["dlt-origin-module"]!!, + map[KafkaHeaders.DLT_ORIGINAL_TOPIC], + map[KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP], + map[KafkaHeaders.DLT_EXCEPTION_MESSAGE], + map[KafkaHeaders.DLT_EXCEPTION_STACKTRACE], + map[KafkaHeaders.DLT_EXCEPTION_FQCN], + event, + LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), TimeZone.getDefault().toZoneId()) + ) + + persister.save(dlt) + logger.info("DLT persisted") + } + + +} \ No newline at end of file diff --git a/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/EventlogEventListener.kt b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/EventlogEventListener.kt new file mode 100644 index 000000000..7f69846fd --- /dev/null +++ b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/EventlogEventListener.kt @@ -0,0 +1,33 @@ +package co.nilin.opex.eventlog.app.listeners + +import co.nilin.opex.eventlog.core.spi.EventPersister +import co.nilin.opex.eventlog.core.spi.OrderPersister +import co.nilin.opex.eventlog.ports.kafka.listener.spi.EventListener +import co.nilin.opex.matching.engine.core.eventh.events.* +import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory + +class EventlogEventListener( + private val orderPersister: OrderPersister, + private val eventPersister: EventPersister +) : EventListener { + + private val log = LoggerFactory.getLogger(EventlogEventListener::class.java) + + override fun id(): String { + return "EventListener" + } + + override fun onEvent(coreEvent: CoreEvent, partition: Int, offset: Long, timestamp: Long) { + log.debug("Receive CoreEvent $coreEvent") + runBlocking { + when (coreEvent) { + is CreateOrderEvent -> orderPersister.saveOrder(coreEvent) + is RejectOrderEvent -> orderPersister.rejectOrder(coreEvent) + is UpdatedOrderEvent -> orderPersister.updateOrder(coreEvent) + is CancelOrderEvent -> orderPersister.cancelOrder(coreEvent) + } + eventPersister.saveEvent(coreEvent) + } + } +} \ No newline at end of file diff --git a/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/EventlogTradeListener.kt b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/EventlogTradeListener.kt new file mode 100644 index 000000000..968d2705c --- /dev/null +++ b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/EventlogTradeListener.kt @@ -0,0 +1,23 @@ +package co.nilin.opex.eventlog.app.listeners + +import co.nilin.opex.eventlog.core.spi.TradePersister +import co.nilin.opex.eventlog.ports.kafka.listener.spi.TradeListener +import co.nilin.opex.matching.engine.core.eventh.events.TradeEvent +import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory + +class EventlogTradeListener(private val tradePersister: TradePersister) : TradeListener { + + private val log = LoggerFactory.getLogger(EventlogTradeListener::class.java) + + override fun id(): String { + return "TradeListener" + } + + override fun onTrade(tradeEvent: TradeEvent, partition: Int, offset: Long, timestamp: Long) { + log.debug("Receive TradeEvent {}", tradeEvent) + runBlocking { + tradePersister.saveTrade(tradeEvent) + } + } +} \ No newline at end of file diff --git a/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/OrderListener.kt b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/OrderListener.kt new file mode 100644 index 000000000..c4cc54a06 --- /dev/null +++ b/eventlog/eventlog-app/src/main/kotlin/co/nilin/opex/eventlog/app/listeners/OrderListener.kt @@ -0,0 +1,30 @@ +package co.nilin.opex.eventlog.app.listeners + +import co.nilin.opex.eventlog.core.spi.OrderPersister +import co.nilin.opex.eventlog.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.eventlog.ports.kafka.listener.spi.OrderSubmitRequestListener +import co.nilin.opex.matching.engine.core.eventh.events.SubmitOrderEvent + +class OrderListener(private val orderPersister: OrderPersister) : OrderSubmitRequestListener { + + override fun id(): String { + return "OrderListener" + } + + override suspend fun onOrder(order: OrderSubmitRequest, partition: Int, offset: Long, timestamp: Long) { + orderPersister.submitOrder( + SubmitOrderEvent( + order.ouid, + order.uuid, + order.orderId, + order.pair, + order.price, + order.quantity, + 0, + order.direction, + order.matchConstraint, + order.orderType + ) + ) + } +} \ No newline at end of file diff --git a/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/inout/DeadLetterEvent.kt b/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/inout/DeadLetterEvent.kt new file mode 100644 index 000000000..67429dc97 --- /dev/null +++ b/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/inout/DeadLetterEvent.kt @@ -0,0 +1,14 @@ +package co.nilin.opex.eventlog.core.inout + +import java.time.LocalDateTime + +data class DeadLetterEvent( + val originModule: String, + val originTopic: String?, + val consumerGroup: String?, + val exceptionMessage: String?, + val exceptionStacktrace: String?, + val exceptionClassName: String?, + val value: String?, + val timestamp: LocalDateTime, +) \ No newline at end of file diff --git a/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/inout/OriginModule.kt b/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/inout/OriginModule.kt new file mode 100644 index 000000000..3065450b3 --- /dev/null +++ b/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/inout/OriginModule.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.eventlog.core.inout + +enum class OriginModule { + + ACCOUNTANT, API, MATCHING_ENGINE, WALLET, WEBSOCKET + +} \ No newline at end of file diff --git a/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/spi/DeadLetter.kt b/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/spi/DeadLetter.kt new file mode 100644 index 000000000..c291d8e8e --- /dev/null +++ b/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/spi/DeadLetter.kt @@ -0,0 +1,4 @@ +package co.nilin.opex.eventlog.core.spi + +interface DeadLetter { +} \ No newline at end of file diff --git a/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/spi/DeadLetterPersister.kt b/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/spi/DeadLetterPersister.kt new file mode 100644 index 000000000..10e343924 --- /dev/null +++ b/eventlog/eventlog-core/src/main/kotlin/co/nilin/opex/eventlog/core/spi/DeadLetterPersister.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.eventlog.core.spi + +import co.nilin.opex.eventlog.core.inout.DeadLetterEvent + +interface DeadLetterPersister { + + suspend fun save(event: DeadLetterEvent) + +} \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/pom.xml b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/pom.xml index f0396f959..897a55ea1 100644 --- a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/pom.xml +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/pom.xml @@ -32,6 +32,10 @@ co.nilin.opex.matching.engine.core matching-engine-core + + co.nilin.opex.eventlog.core + eventlog-core + org.springframework.kafka spring-kafka diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/EventLogKafkaConfig.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/EventLogKafkaConfig.kt index b4f56cf3e..d984b6792 100644 --- a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/EventLogKafkaConfig.kt +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/EventLogKafkaConfig.kt @@ -1,39 +1,36 @@ package co.nilin.opex.eventlog.ports.kafka.listener.config - +import co.nilin.opex.eventlog.ports.kafka.listener.consumer.DLTKafkaListener import co.nilin.opex.eventlog.ports.kafka.listener.consumer.EventKafkaListener import co.nilin.opex.eventlog.ports.kafka.listener.consumer.OrderKafkaListener import co.nilin.opex.eventlog.ports.kafka.listener.consumer.TradeKafkaListener import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.kafka.core.* +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.listener.ConcurrentMessageListenerContainer import org.springframework.kafka.listener.ContainerProperties import org.springframework.kafka.support.serializer.JsonDeserializer -import org.springframework.kafka.support.serializer.JsonSerializer import java.util.regex.Pattern - @Configuration class EventLogKafkaConfig { @Value("\${spring.kafka.bootstrap-servers}") - private val bootstrapServers: String? = null + private lateinit var bootstrapServers: String @Value("\${spring.kafka.consumer.group-id}") - private val groupId: String? = null + private lateinit var groupId: String @Bean("eventLogConsumerConfig") - fun consumerConfigs(): Map? { + fun consumerConfigs(): Map { return mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG to groupId, @@ -44,29 +41,25 @@ class EventLogKafkaConfig { ) } - @Bean("eventLogConsumerFactory") - fun consumerFactory(@Qualifier("eventLogConsumerConfig") consumerConfigs: Map): ConsumerFactory { - return DefaultKafkaConsumerFactory(consumerConfigs) - } - - @Bean("eventLogProducerConfig") - fun producerConfigs(): Map { + @Bean("dltConsumerConfig") + fun dltConsumerConfig(): Map { return mapOf( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, - ProducerConfig.ACKS_CONFIG to "all" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG to groupId, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", ) } - @Bean("eventLogProducerFactory") - fun producerFactory(@Qualifier("eventLogProducerConfig") producerConfigs: Map): ProducerFactory { - return DefaultKafkaProducerFactory(producerConfigs) + @Bean("eventLogConsumerFactory") + fun consumerFactory(@Qualifier("eventLogConsumerConfig") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) } - @Bean("eventLogKafkaTemplate") - fun kafkaTemplate(@Qualifier("eventLogProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { - return KafkaTemplate(producerFactory) + @Bean + fun dltConsumerFactory(@Qualifier("dltConsumerConfig") configs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(configs) } @Autowired @@ -108,5 +101,17 @@ class EventLogKafkaConfig { container.start() } + @Autowired + @ConditionalOnBean(OrderKafkaListener::class) + fun configureDLTListener( + orderListener: DLTKafkaListener, + consumerFactory: ConsumerFactory + ) { + val containerProps = ContainerProperties(Pattern.compile(".*\\.DLT")) + containerProps.messageListener = orderListener + val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) + container.setBeanName("DLTKafkaListenerContainer") + container.start() + } } \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/KafkaProducerConfig.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/KafkaProducerConfig.kt new file mode 100644 index 000000000..ec7abee87 --- /dev/null +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/KafkaProducerConfig.kt @@ -0,0 +1,41 @@ +package co.nilin.opex.eventlog.ports.kafka.listener.config + +import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonSerializer + +@Configuration +class KafkaProducerConfig { + + @Value("\${spring.kafka.bootstrap-servers}") + private lateinit var bootstrapServers: String + + @Bean("eventLogProducerConfig") + fun producerConfigs(): Map { + return mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all" + ) + } + + @Bean("eventLogProducerFactory") + fun producerFactory(@Qualifier("eventLogProducerConfig") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("eventLogKafkaTemplate") + fun kafkaTemplate(@Qualifier("eventLogProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(producerFactory) + } + +} \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/KafkaTopicConfig.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/KafkaTopicConfig.kt new file mode 100644 index 000000000..2ac0fab81 --- /dev/null +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/config/KafkaTopicConfig.kt @@ -0,0 +1,25 @@ +package co.nilin.opex.eventlog.ports.kafka.listener.config + +import org.apache.kafka.clients.admin.NewTopic +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Configuration +import org.springframework.context.support.GenericApplicationContext + +@Configuration +class KafkaTopicConfig { + + @Autowired + fun createTopics(applicationContext: GenericApplicationContext) { + with(applicationContext) { + registerBean("topic_events.DLT", NewTopic::class.java, "events.DLT", 10, 1) + registerBean("topic_orders.DLT", NewTopic::class.java, "orders.DLT", 10, 1) + registerBean("topic_trades.DLT", NewTopic::class.java, "trades.DLT", 10, 1) + registerBean("topic_tempevents.DLT", NewTopic::class.java, "tempevents.DLT", 10, 1) + registerBean("topic_richTrade.DLT", NewTopic::class.java, "richTrade.DLT", 10, 1) + registerBean("topic_richOrder.DLT", NewTopic::class.java, "richOrder.DLT", 10, 1) + registerBean("topic_admin_event.DLT", NewTopic::class.java, "admin_event.DLT", 10, 1) + registerBean("topic_auth_user_created.DLT", NewTopic::class.java, "auth_user_created.DLT", 10, 1) + } + } + +} \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/DLTKafkaListener.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/DLTKafkaListener.kt new file mode 100644 index 000000000..aa9ee7a69 --- /dev/null +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/consumer/DLTKafkaListener.kt @@ -0,0 +1,27 @@ +package co.nilin.opex.eventlog.ports.kafka.listener.consumer + +import co.nilin.opex.eventlog.ports.kafka.listener.spi.DLTListener +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.listener.MessageListener +import org.springframework.stereotype.Component + +@Component +class DLTKafkaListener : MessageListener { + + private val listeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { + + listeners.forEach { it.onEvent(data.value(), data.partition(), data.offset(), data.timestamp(), data.headers()) } + } + + fun addEventListener(tl: DLTListener) { + listeners.add(tl) + } + + fun removeEventListener(tl: DLTListener) { + listeners.removeIf { + it.id() == tl.id() + } + } +} \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/spi/DLTListener.kt b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/spi/DLTListener.kt new file mode 100644 index 000000000..6832de68e --- /dev/null +++ b/eventlog/eventlog-ports/eventlog-eventlistener-kafka/src/main/kotlin/co/nilin/opex/eventlog/ports/kafka/listener/spi/DLTListener.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.eventlog.ports.kafka.listener.spi + +import org.apache.kafka.common.header.Headers + +interface DLTListener { + fun id(): String + fun onEvent(event: String?, partition: Int, offset: Long, timestamp: Long, headers: Headers) +} \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/kotlin/co/nilin/opex/eventlog/ports/postgres/dao/DeadLetterEventRepository.kt b/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/kotlin/co/nilin/opex/eventlog/ports/postgres/dao/DeadLetterEventRepository.kt new file mode 100644 index 000000000..97c430eef --- /dev/null +++ b/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/kotlin/co/nilin/opex/eventlog/ports/postgres/dao/DeadLetterEventRepository.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.eventlog.ports.postgres.dao + +import co.nilin.opex.eventlog.ports.postgres.model.DeadLetterEventModel +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository + +@Repository +interface DeadLetterEventRepository : ReactiveCrudRepository { +} \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/kotlin/co/nilin/opex/eventlog/ports/postgres/impl/DeadLetterPersisterImpl.kt b/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/kotlin/co/nilin/opex/eventlog/ports/postgres/impl/DeadLetterPersisterImpl.kt new file mode 100644 index 000000000..d4506cef7 --- /dev/null +++ b/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/kotlin/co/nilin/opex/eventlog/ports/postgres/impl/DeadLetterPersisterImpl.kt @@ -0,0 +1,29 @@ +package co.nilin.opex.eventlog.ports.postgres.impl + +import co.nilin.opex.eventlog.core.inout.DeadLetterEvent +import co.nilin.opex.eventlog.core.spi.DeadLetterPersister +import co.nilin.opex.eventlog.ports.postgres.dao.DeadLetterEventRepository +import co.nilin.opex.eventlog.ports.postgres.model.DeadLetterEventModel +import kotlinx.coroutines.reactive.awaitFirstOrNull +import org.springframework.stereotype.Component + +@Component +class DeadLetterPersisterImpl(private val repository: DeadLetterEventRepository) : DeadLetterPersister { + + override suspend fun save(event: DeadLetterEvent) { + repository.save( + with(event) { + DeadLetterEventModel( + originModule, + originTopic, + consumerGroup, + exceptionMessage, + exceptionStacktrace, + exceptionClassName, + value, + timestamp + ) + } + ).awaitFirstOrNull() + } +} \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/kotlin/co/nilin/opex/eventlog/ports/postgres/model/DeadLetterEventModel.kt b/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/kotlin/co/nilin/opex/eventlog/ports/postgres/model/DeadLetterEventModel.kt new file mode 100644 index 000000000..91d44b3b3 --- /dev/null +++ b/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/kotlin/co/nilin/opex/eventlog/ports/postgres/model/DeadLetterEventModel.kt @@ -0,0 +1,18 @@ +package co.nilin.opex.eventlog.ports.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Table +import java.time.LocalDateTime + +@Table("dead_letter_events") +data class DeadLetterEventModel( + val originModule: String, + val originTopic: String?, + val consumerGroup: String?, + val exceptionMessage: String?, + val exceptionStacktrace: String?, + val exceptionClassName: String?, + val value: String?, + val timestamp: LocalDateTime = LocalDateTime.now(), + @Id var id: Long? = null +) \ No newline at end of file diff --git a/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/resources/schema.sql b/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/resources/schema.sql index 8c5be4314..f4c381101 100644 --- a/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/resources/schema.sql +++ b/eventlog/eventlog-ports/eventlog-persister-postgres/src/main/resources/schema.sql @@ -64,3 +64,16 @@ CREATE TABLE IF NOT EXISTS opex_trades trade_date TIMESTAMP NOT NULL, create_date TIMESTAMP NOT NULL ); + +CREATE TABLE IF NOT EXISTS dead_letter_events +( + id SERIAL PRIMARY KEY, + origin_module VARCHAR(72) NOT NULL, + origin_topic VARCHAR(72), + consumer_group VARCHAR(72), + exception_message TEXT, + exception_stacktrace TEXT, + exception_class_name TEXT, + timestamp TIMESTAMP NOT NULL, + value TEXT +) diff --git a/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt b/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt index b452fcbdd..dc55ef4f2 100644 --- a/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt +++ b/matching-engine/matching-engine-app/src/main/kotlin/co/nilin/opex/matching/engine/app/listener/OrderListener.kt @@ -2,7 +2,7 @@ package co.nilin.opex.matching.engine.app.listener import co.nilin.opex.matching.engine.app.bl.OrderBooks import co.nilin.opex.matching.engine.core.inout.OrderCreateCommand -import co.nilin.opex.matching.engine.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderSubmitRequestListener class OrderListener : OrderSubmitRequestListener { diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/inout/OrderSubmitRequest.kt b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt similarity index 94% rename from matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/inout/OrderSubmitRequest.kt rename to matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt index f160f1d56..d1519366f 100644 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/inout/OrderSubmitRequest.kt +++ b/matching-engine/matching-engine-core/src/main/kotlin/co/nilin/opex/matching/engine/core/inout/OrderSubmitRequest.kt @@ -1,4 +1,4 @@ -package co.nilin.opex.matching.engine.ports.kafka.listener.inout +package co.nilin.opex.matching.engine.core.inout import co.nilin.opex.matching.engine.core.model.MatchConstraint import co.nilin.opex.matching.engine.core.model.OrderDirection diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt index a09a0b2a4..8f2edb314 100644 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/config/OrderKafkaConfig.kt @@ -1,29 +1,23 @@ package co.nilin.opex.matching.engine.ports.kafka.listener.config import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent +import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest import co.nilin.opex.matching.engine.ports.kafka.listener.consumer.EventKafkaListener import co.nilin.opex.matching.engine.ports.kafka.listener.consumer.OrderKafkaListener -import co.nilin.opex.matching.engine.ports.kafka.listener.inout.OrderSubmitRequest -import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.context.support.GenericApplicationContext -import org.springframework.kafka.config.TopicBuilder -import org.springframework.kafka.core.* -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer -import org.springframework.kafka.listener.ContainerProperties -import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.* import org.springframework.kafka.support.serializer.JsonDeserializer -import org.springframework.kafka.support.serializer.JsonSerializer -import java.util.function.Supplier +import org.springframework.util.backoff.FixedBackOff import java.util.regex.Pattern @Configuration @@ -33,95 +27,68 @@ class OrderKafkaConfig { private lateinit var bootstrapServers: String @Value("\${spring.kafka.consumer.group-id}") - private val groupId: String? = null + private lateinit var groupId: String @Value("\${spring.app.symbols}") - private val symbols: String? = null - - @Autowired - private val applicationContext: GenericApplicationContext? = null - - @Autowired - fun createTopics() { - symbols!!.split(",") - .map { s -> "orders_$s" } - .forEach { topic -> - applicationContext?.registerBean("topic_${topic}", NewTopic::class.java, Supplier { - TopicBuilder.name(topic) - .partitions(10) - .replicas(3) - .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") - .build() - }) - } - } - - @Bean("orderProducerConfigs") - fun producerConfigs(): Map? { - val props: MutableMap = HashMap() - props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java - return props - } - - @Bean("orderProducerFactory") - fun producerFactory(@Qualifier("orderProducerConfigs") producerConfigs: Map): ProducerFactory { - return DefaultKafkaProducerFactory(producerConfigs) - } - - @Bean("orderKafkaTemplate") - fun kafkaTemplate(@Qualifier("orderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { - return KafkaTemplate(producerFactory) - } - - @Bean("orderConsumerConfigs") - fun consumerConfigs(): Map? { - val props: MutableMap = HashMap() - props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - props[ConsumerConfig.GROUP_ID_CONFIG] = groupId - props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java - props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java - props[JsonDeserializer.TRUSTED_PACKAGES] = "co.nilin.opex.*" - props[JsonDeserializer.TYPE_MAPPINGS] = - "order_request:co.nilin.opex.matching.engine.ports.kafka.listener.inout.OrderSubmitRequest" - return props + private lateinit var symbols: String + + @Bean("consumerConfigs") + fun consumerConfigs(): Map { + return mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG to groupId, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, + JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", + JsonDeserializer.TYPE_MAPPINGS to "order_request:co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest" + ) } @Bean("orderConsumerFactory") - fun consumerFactory(@Qualifier("orderConsumerConfigs") consumerConfigs: Map): ConsumerFactory { + fun consumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } @Bean("eventConsumerFactory") - fun eventConsumerFactory(@Qualifier("orderConsumerConfigs") consumerConfigs: Map): ConsumerFactory { + fun eventConsumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } @Autowired fun configureListener( orderKafkaListener: OrderKafkaListener, - @Qualifier("orderConsumerFactory") consumerFactory: ConsumerFactory, - kafkaAdmin: KafkaAdmin + @Qualifier("orderKafkaTemplate") template: KafkaTemplate, + @Qualifier("orderConsumerFactory") consumerFactory: ConsumerFactory ) { - val topics = symbols!!.split(",").map { s -> "orders_$s" }.toTypedArray() + val topics = symbols.split(",").map { s -> "orders_$s" }.toTypedArray() val containerProps = ContainerProperties(*topics) containerProps.messageListener = orderKafkaListener val container = KafkaMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("OrderKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "orders.DLT") container.start() } @Autowired fun configureEventListener( eventListener: EventKafkaListener, + @Qualifier("eventsKafkaTemplate") template: KafkaTemplate, @Qualifier("eventConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("events_.*")) containerProps.messageListener = eventListener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("EventKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "events.DLT") container.start() } + private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler { + val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ -> + cr.headers().add("dlt-origin-module", "MATCHING_ENGINE".toByteArray()) + TopicPartition(dltTopic, cr.partition()) + } + return DefaultErrorHandler(recoverer, FixedBackOff(5_000, 20)) + } + } \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt index 48118d004..69a6327d2 100644 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/consumer/OrderKafkaListener.kt @@ -1,6 +1,6 @@ package co.nilin.opex.matching.engine.ports.kafka.listener.consumer -import co.nilin.opex.matching.engine.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest import co.nilin.opex.matching.engine.ports.kafka.listener.spi.OrderSubmitRequestListener import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.ConsumerRecord diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/inout/OrderSubmitResult.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/inout/OrderSubmitResult.kt deleted file mode 100644 index ec74b34cd..000000000 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/inout/OrderSubmitResult.kt +++ /dev/null @@ -1,3 +0,0 @@ -package co.nilin.opex.matching.engine.ports.kafka.listener.inout - -class OrderSubmitResult(offset: Long?) \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt index f51cfb696..e2268cef6 100644 --- a/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt +++ b/matching-engine/matching-engine-ports/matching-engine-eventlistener-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/listener/spi/OrderSubmitRequestListener.kt @@ -1,6 +1,6 @@ package co.nilin.opex.matching.engine.ports.kafka.listener.spi -import co.nilin.opex.matching.engine.ports.kafka.listener.inout.OrderSubmitRequest +import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest interface OrderSubmitRequestListener { fun id(): String diff --git a/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt index 40b2f51bd..53828b3b3 100644 --- a/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt +++ b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/EventsKafkaConfig.kt @@ -1,24 +1,17 @@ package co.nilin.opex.matching.engine.ports.kafka.submitter.config import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent -import org.apache.kafka.clients.admin.AdminClientConfig -import org.apache.kafka.clients.admin.NewTopic +import co.nilin.opex.matching.engine.core.inout.OrderSubmitRequest import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.serialization.StringSerializer -import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.context.support.GenericApplicationContext -import org.springframework.kafka.config.TopicBuilder import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaAdmin import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory import org.springframework.kafka.support.serializer.JsonSerializer -import java.util.function.Supplier @Configuration class EventsKafkaConfig { @@ -26,13 +19,7 @@ class EventsKafkaConfig { @Value("\${spring.kafka.bootstrap-servers}") private lateinit var bootstrapServers: String - @Value("\${spring.app.symbols}") - private val symbols: String? = null - - @Autowired - private val applicationContext: GenericApplicationContext? = null - - @Bean("eventsProducerConfigs") + @Bean("producerConfigs") fun producerConfigs(): Map { return mapOf( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, @@ -43,45 +30,23 @@ class EventsKafkaConfig { } @Bean("eventsProducerFactory") - fun producerFactory(@Qualifier("eventsProducerConfigs") producerConfigs: Map): ProducerFactory { + fun eventProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { return DefaultKafkaProducerFactory(producerConfigs) } @Bean("eventsKafkaTemplate") - fun kafkaTemplate(@Qualifier("eventsProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + fun eventKafkaTemplate(@Qualifier("eventsProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { return KafkaTemplate(producerFactory) } - @Bean - fun admin(): KafkaAdmin? { - val configs: MutableMap = HashMap() - configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers - return KafkaAdmin(configs) + @Bean("orderProducerFactory") + fun orderProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) } - @Autowired - fun createTopics() { - symbols!!.split(",") - .map { s -> "events_$s" } - .forEach { topic -> - applicationContext?.registerBean("topic_${topic}", NewTopic::class.java, Supplier { - TopicBuilder.name(topic) - .partitions(10) - .replicas(3) - .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") - .build() - }) - } - symbols.split(",") - .map { s -> "trades_$s" } - .forEach { topic -> - applicationContext?.registerBean("topic_${topic}", NewTopic::class.java, Supplier { - TopicBuilder.name(topic) - .partitions(10) - .replicas(3) - .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") - .build() - }) - } + @Bean("orderKafkaTemplate") + fun orderKafkaTemplate(@Qualifier("orderProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(producerFactory) } + } \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/KafkaAdminConfig.kt b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/KafkaAdminConfig.kt new file mode 100644 index 000000000..6184f49aa --- /dev/null +++ b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/KafkaAdminConfig.kt @@ -0,0 +1,21 @@ +package co.nilin.opex.matching.engine.ports.kafka.submitter.config + +import org.apache.kafka.clients.admin.AdminClientConfig +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.KafkaAdmin + +@Configuration +class KafkaAdminConfig { + + @Value("\${spring.kafka.bootstrap-servers}") + private lateinit var bootstrapServers: String + + @Bean + fun admin(): KafkaAdmin? { + val configs = hashMapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers) + return KafkaAdmin(configs) + } + +} \ No newline at end of file diff --git a/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/KafkaTopicConfig.kt b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/KafkaTopicConfig.kt new file mode 100644 index 000000000..5cbc89854 --- /dev/null +++ b/matching-engine/matching-engine-ports/matching-engine-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/engine/ports/kafka/submitter/config/KafkaTopicConfig.kt @@ -0,0 +1,57 @@ +package co.nilin.opex.matching.engine.ports.kafka.submitter.config + +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.common.config.TopicConfig +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Configuration +import org.springframework.context.support.GenericApplicationContext +import org.springframework.kafka.config.TopicBuilder +import java.util.function.Supplier + +@Configuration +class KafkaTopicConfig { + + @Value("\${spring.app.symbols}") + private lateinit var symbols: String + + @Autowired + fun createTopics(applicationContext: GenericApplicationContext) { + symbols.split(",") + .map { s -> "orders_$s" } + .forEach { topic -> + applicationContext.registerBean("topic_${topic}", NewTopic::class.java, Supplier { + TopicBuilder.name(topic) + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + } + + symbols.split(",") + .map { s -> "events_$s" } + .forEach { topic -> + applicationContext.registerBean("topic_${topic}", NewTopic::class.java, Supplier { + TopicBuilder.name(topic) + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + } + + symbols.split(",") + .map { s -> "trades_$s" } + .forEach { topic -> + applicationContext.registerBean("topic_${topic}", NewTopic::class.java, Supplier { + TopicBuilder.name(topic) + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + } + } + +} \ No newline at end of file diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/EventSubmitter.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/EventSubmitter.kt index fadcbde0a..53a135935 100644 --- a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/EventSubmitter.kt +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/EventSubmitter.kt @@ -14,14 +14,15 @@ class EventSubmitter(val kafkaTemplate: KafkaTemplate) { private val logger = LoggerFactory.getLogger(EventSubmitter::class.java) - suspend fun submit(event: CoreEvent): OrderSubmitResult = suspendCoroutine { + suspend fun submit(event: CoreEvent): OrderSubmitResult = suspendCoroutine { cont -> logger.info("Submit event for pair ${event.pair} = ${event::class.java}") - val sendFuture = kafkaTemplate.send("events_${event.pair.leftSideName}_${event.pair.rightSideName}", event) - sendFuture.addCallback({ sendResult -> - it.resume(OrderSubmitResult(sendResult?.recordMetadata?.offset())) - }, { exception -> - it.resumeWithException(exception) + val sendFuture = kafkaTemplate.send("events_${event.pair.leftSideName}_${event.pair.rightSideName}", event) + sendFuture.addCallback({ + cont.resume(OrderSubmitResult(it?.recordMetadata?.offset())) + }, { + logger.error("Error submitting Event", it) + cont.resumeWithException(it) }) } diff --git a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderSubmitter.kt b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderSubmitter.kt index 25e32cb70..57ed1bd61 100644 --- a/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderSubmitter.kt +++ b/matching-gateway/matching-gateway-port/matching-gateway-submitter-kafka/src/main/kotlin/co/nilin/opex/matching/gateway/ports/kafka/submitter/service/OrderSubmitter.kt @@ -2,6 +2,7 @@ package co.nilin.opex.matching.gateway.ports.kafka.submitter.service import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitRequest import co.nilin.opex.matching.gateway.ports.kafka.submitter.inout.OrderSubmitResult +import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component import kotlin.coroutines.resume @@ -10,18 +11,19 @@ import kotlin.coroutines.suspendCoroutine @Component class OrderSubmitter(val kafkaTemplate: KafkaTemplate) { + + private val logger = LoggerFactory.getLogger(OrderSubmitter::class.java) + suspend fun submit(order: OrderSubmitRequest): OrderSubmitResult = suspendCoroutine { cont -> - println("OrderSubmit!") + logger.info("Submitting OrderSubmitRequest: ouid=${order.ouid}") + val sendFuture = kafkaTemplate.send("orders_${order.pair.leftSideName}_${order.pair.rightSideName}", order) - sendFuture.addCallback({ sendResult -> - cont.resume(OrderSubmitResult(sendResult?.recordMetadata?.offset())) - }, { exception -> - cont.resumeWithException(exception) + sendFuture.addCallback({ + cont.resume(OrderSubmitResult(it?.recordMetadata?.offset())) + }, { + logger.error("Error submitting OrderSubmitRequest", it) + cont.resumeWithException(it) }) - /*cont.invokeOnCancellation { - sendFuture.cancel(true) - }*/ } - } \ No newline at end of file diff --git a/user-management/keycloak-gateway/src/main/kotlin/co/nilin/opex/auth/gateway/extension/ExtendedEventListenerProvider.kt b/user-management/keycloak-gateway/src/main/kotlin/co/nilin/opex/auth/gateway/extension/ExtendedEventListenerProvider.kt index a4e548a28..35cdec848 100644 --- a/user-management/keycloak-gateway/src/main/kotlin/co/nilin/opex/auth/gateway/extension/ExtendedEventListenerProvider.kt +++ b/user-management/keycloak-gateway/src/main/kotlin/co/nilin/opex/auth/gateway/extension/ExtendedEventListenerProvider.kt @@ -4,6 +4,7 @@ import co.nilin.opex.auth.gateway.ApplicationContextHolder import co.nilin.opex.auth.gateway.model.AuthEvent import co.nilin.opex.auth.gateway.model.UserCreatedEvent import com.fasterxml.jackson.databind.DeserializationFeature +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import org.keycloak.events.Event import org.keycloak.events.EventListenerProvider import org.keycloak.events.EventType @@ -17,10 +18,10 @@ import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate class ExtendedEventListenerProvider(private val session: KeycloakSession) : EventListenerProvider { - val logger = LoggerFactory.getLogger(ExtendedEventListenerProvider::class.java) - private val model: RealmProvider - val objectMapper = com.fasterxml.jackson.module.kotlin.jacksonObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + + private val logger = LoggerFactory.getLogger(ExtendedEventListenerProvider::class.java) + private val model: RealmProvider = session.realms() + private val objectMapper = jacksonObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) data class UserData( val username: String, @@ -35,9 +36,9 @@ class ExtendedEventListenerProvider(private val session: KeycloakSession) : Even override fun onEvent(event: Event) { - logger.info("## NEW %s EVENT", event.getType()) + logger.info("## NEW %s EVENT", event.type) logger.info("-----------------------------------------------------------") - event.getDetails().forEach { key, value -> logger.info(key.toString() + ": " + value) } + event.details.forEach { (key, value) -> logger.info("$key: $value") } // USE CASE SCENARIO, I'm sure there are better use case scenario's :p // @@ -47,15 +48,15 @@ class ExtendedEventListenerProvider(private val session: KeycloakSession) : Even // When the user tries to login after a failed attempt, // the user remains unverified and when trying to login will receive another verify account email. - if (EventType.VERIFY_EMAIL.equals(event.getType())) { - val realm = model.getRealm(event.getRealmId()) - val user = session.users().getUserById(event.getUserId(), realm) + if (EventType.VERIFY_EMAIL == event.type) { + val realm = model.getRealm(event.realmId) + val user = session.users().getUserById(event.userId, realm) if (user != null && user.email != null && user.isEmailVerified) { - logger.info("USER HAS VERIFIED EMAIL : " + event.getUserId()) + logger.info("USER HAS VERIFIED EMAIL : ${event.userId}" ) // Example of adding an attribute when this event happens user.setSingleAttribute("attribute-key", "attribute-value") - val userUuidDto = UserUuidDto(event.getType().name, event.getUserId(), user.email) + val userUuidDto = UserUuidDto(event.type.name, event.userId, user.email) val userVerifiedTransaction = UserVerifiedTransaction(userUuidDto) // enlistPrepare -> if our transaction fails than the user is NOT verified @@ -73,9 +74,7 @@ class ExtendedEventListenerProvider(private val session: KeycloakSession) : Even logger.info("Resource path" + ": " + adminEvent.resourcePath) logger.info("Resource type" + ": " + adminEvent.resourceType) logger.info("Operation type" + ": " + adminEvent.operationType) - if (ResourceType.USER.equals(adminEvent.resourceType) - && OperationType.CREATE.equals(adminEvent.operationType) - ) { + if (ResourceType.USER == adminEvent.resourceType && OperationType.CREATE == adminEvent.operationType) { logger.info("A new user has been created") val userData = objectMapper.readValue(adminEvent.representation, UserData::class.java) val uuid = adminEvent.resourcePath.substringAfter("/") @@ -92,11 +91,6 @@ class ExtendedEventListenerProvider(private val session: KeycloakSession) : Even // Nothing to close } - - init { - model = session.realms() - } - class UserVerifiedTransaction(private val userUuidDto: UserUuidDto) : AbstractKeycloakTransaction() { override fun commitImpl() { logger.info("## USER VERIFIED TRANSACTION") diff --git a/user-management/keycloak-gateway/src/main/kotlin/co/nilin/opex/auth/gateway/extension/UserManagementResource.kt b/user-management/keycloak-gateway/src/main/kotlin/co/nilin/opex/auth/gateway/extension/UserManagementResource.kt index d6aa62861..107515f91 100644 --- a/user-management/keycloak-gateway/src/main/kotlin/co/nilin/opex/auth/gateway/extension/UserManagementResource.kt +++ b/user-management/keycloak-gateway/src/main/kotlin/co/nilin/opex/auth/gateway/extension/UserManagementResource.kt @@ -1,8 +1,11 @@ package co.nilin.opex.auth.gateway.extension +import co.nilin.opex.auth.gateway.ApplicationContextHolder import co.nilin.opex.auth.gateway.data.RegisterUserRequest import co.nilin.opex.auth.gateway.data.RegisterUserResponse import co.nilin.opex.auth.gateway.data.UserProfileInfo +import co.nilin.opex.auth.gateway.model.AuthEvent +import co.nilin.opex.auth.gateway.model.UserCreatedEvent import co.nilin.opex.auth.gateway.utils.ErrorHandler import co.nilin.opex.auth.gateway.utils.ResourceAuthenticator import co.nilin.opex.utility.error.data.OpexError @@ -16,6 +19,7 @@ import org.keycloak.models.UserModel import org.keycloak.services.resource.RealmResourceProvider import org.keycloak.services.resources.LoginActionsService import org.slf4j.LoggerFactory +import org.springframework.kafka.core.KafkaTemplate import java.util.concurrent.TimeUnit import javax.ws.rs.* import javax.ws.rs.core.MediaType @@ -26,6 +30,10 @@ class UserManagementResource(private val session: KeycloakSession) : RealmResour private val logger = LoggerFactory.getLogger(UserManagementResource::class.java) private val opexRealm = session.realms().getRealm("opex") + private val kafkaTemplate by lazy { + ApplicationContextHolder.getCurrentContext()!! + .getBean("authKafkaTemplate") as KafkaTemplate + } @POST @Path("user") @@ -52,6 +60,8 @@ class UserManagementResource(private val session: KeycloakSession) : RealmResour } logger.info("User create response ${user.id}") + sendUserEvent(user) + return Response.ok(RegisterUserResponse(user.id)).build() } @@ -167,6 +177,12 @@ class UserManagementResource(private val session: KeycloakSession) : RealmResour } } + private fun sendUserEvent(user: UserModel) { + val kafkaEvent = UserCreatedEvent(user.id, user.firstName, user.lastName, user.email!!) + kafkaTemplate.send("auth_user_created", kafkaEvent) + logger.info("$kafkaEvent produced in kafka topic") + } + override fun close() { } diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt new file mode 100644 index 000000000..574120a10 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaProducerConfig.kt @@ -0,0 +1,51 @@ +package co.nilin.opex.wallet.ports.kafka.listener.config + +import co.nilin.opex.wallet.ports.kafka.listener.model.AdminEvent +import co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonSerializer + +@Configuration +class KafkaProducerConfig { + + @Value("\${spring.kafka.bootstrap-servers}") + private lateinit var bootstrapServers: String + + @Bean("producerConfigs") + fun producerConfigs(): Map { + return mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all" + ) + } + + @Bean("walletProducerFactory") + fun producerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("walletKafkaTemplate") + fun kafkaTemplate(factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + + @Bean + fun adminProducerFactory(@Qualifier("producerConfigs") config: Map): ProducerFactory { + return DefaultKafkaProducerFactory(config) + } + + @Bean + fun adminKafkaTemplate(factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt new file mode 100644 index 000000000..8f172e424 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/KafkaTopicConfig.kt @@ -0,0 +1,23 @@ +package co.nilin.opex.wallet.ports.kafka.listener.config + +import org.apache.kafka.clients.admin.NewTopic +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Configuration +import org.springframework.context.support.GenericApplicationContext +import org.springframework.kafka.config.TopicBuilder +import java.util.function.Supplier + +@Configuration +class KafkaTopicConfig { + + @Autowired + fun createTopics(applicationContext: GenericApplicationContext) { + applicationContext.registerBean("topic_auth_user_created", NewTopic::class.java, Supplier { + TopicBuilder.name("auth_user_created") + .partitions(1) + .replicas(1) + .build() + }) + } + +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt index f3cba7bfc..e89b86978 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt @@ -4,37 +4,33 @@ import co.nilin.opex.wallet.ports.kafka.listener.consumer.AdminEventKafkaListene import co.nilin.opex.wallet.ports.kafka.listener.consumer.UserCreatedKafkaListener import co.nilin.opex.wallet.ports.kafka.listener.model.AdminEvent import co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent -import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.context.support.GenericApplicationContext -import org.springframework.kafka.config.TopicBuilder -import org.springframework.kafka.core.* -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer -import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.* import org.springframework.kafka.support.serializer.JsonDeserializer -import org.springframework.kafka.support.serializer.JsonSerializer -import java.util.function.Supplier +import org.springframework.util.backoff.FixedBackOff import java.util.regex.Pattern @Configuration class WalletKafkaConfig { @Value("\${spring.kafka.bootstrap-servers}") - private val bootstrapServers: String? = null + private lateinit var bootstrapServers: String @Value("\${spring.kafka.consumer.group-id}") - private val groupId: String? = null + private lateinit var groupId: String - @Bean + @Bean("consumerConfigs") fun consumerConfigs(): Map { return mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, @@ -47,12 +43,12 @@ class WalletKafkaConfig { } @Bean("walletConsumerFactory") - fun consumerFactory(consumerConfigs: Map): ConsumerFactory { + fun consumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } @Bean - fun adminEventsConsumerFactory(consumerConfigs: Map): ConsumerFactory { + fun adminEventsConsumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } @@ -60,12 +56,14 @@ class WalletKafkaConfig { @ConditionalOnBean(UserCreatedKafkaListener::class) fun configureUserCreatedListener( listener: UserCreatedKafkaListener, + template: KafkaTemplate, @Qualifier("walletConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("auth_user_created")) containerProps.messageListener = listener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("UserCreatedKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "auth_user_created.DLT") container.start() } @@ -73,43 +71,23 @@ class WalletKafkaConfig { @ConditionalOnBean(AdminEventKafkaListener::class) fun configureAdminEventListener( listener: AdminEventKafkaListener, + template: KafkaTemplate, consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("admin_event")) containerProps.messageListener = listener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("AdminEventKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "admin_event.DLT") container.start() } - @Bean("walletProducerConfig") - fun producerConfigs(): Map { - return mapOf( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, - ProducerConfig.ACKS_CONFIG to "all" - ) - } - - @Bean("walletProducerFactory") - fun producerFactory(@Qualifier("walletProducerConfig") producerConfigs: Map): ProducerFactory { - return DefaultKafkaProducerFactory(producerConfigs) - } - - @Bean("walletKafkaTemplate") - fun kafkaTemplate(@Qualifier("walletProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { - return KafkaTemplate(producerFactory) - } - - @Autowired - fun createUserCreatedTopics(applicationContext: GenericApplicationContext) { - applicationContext.registerBean("topic_auth_user_created", NewTopic::class.java, Supplier { - TopicBuilder.name("auth_user_created") - .partitions(1) - .replicas(1) - .build() - }) + private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler { + val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ -> + cr.headers().add("dlt-origin-module", "WALLET".toByteArray()) + TopicPartition(dltTopic, cr.partition()) + } + return DefaultErrorHandler(recoverer, FixedBackOff(5_000, 20)) } } \ No newline at end of file diff --git a/websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/websocket/ports/kafka/listener/config/KafkaProducerConfig.kt b/websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/websocket/ports/kafka/listener/config/KafkaProducerConfig.kt new file mode 100644 index 000000000..2976e3b72 --- /dev/null +++ b/websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/websocket/ports/kafka/listener/config/KafkaProducerConfig.kt @@ -0,0 +1,63 @@ +package co.nilin.opex.websocket.ports.kafka.listener.config + +import co.nilin.opex.accountant.core.inout.RichOrderEvent +import co.nilin.opex.accountant.core.inout.RichTrade +import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonSerializer + +@Configuration +class KafkaProducerConfig { + + @Value("\${spring.kafka.bootstrap-servers}") + private lateinit var bootstrapServers: String + + @Bean("producerConfigs") + fun producerConfigs(): Map { + return mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all" + ) + } + + @Bean("eventsProducerFactory") + fun producerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("eventKafkaTemplate") + fun kafkaTemplate(@Qualifier("eventsProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(producerFactory) + } + + @Bean("richTradeProducerFactory") + fun richTradeProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("richTradeKafkaTemplate") + fun richTradeTemplate(@Qualifier("richTradeProducerFactory") factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + + @Bean("richOrderProducerFactory") + fun richOrderProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("richOrderKafkaTemplate") + fun richOrderTemplate(@Qualifier("richOrderProducerFactory") factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + +} \ No newline at end of file diff --git a/websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/websocket/ports/kafka/listener/config/KafkaTopicConfig.kt b/websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/websocket/ports/kafka/listener/config/KafkaTopicConfig.kt new file mode 100644 index 000000000..d11c8fed8 --- /dev/null +++ b/websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/websocket/ports/kafka/listener/config/KafkaTopicConfig.kt @@ -0,0 +1,33 @@ +package co.nilin.opex.websocket.ports.kafka.listener.config + +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.common.config.TopicConfig +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Configuration +import org.springframework.context.support.GenericApplicationContext +import org.springframework.kafka.config.TopicBuilder +import java.util.function.Supplier + +@Configuration +class KafkaTopicConfig { + + @Autowired + fun createTopics(applicationContext: GenericApplicationContext) { + applicationContext.registerBean("topic_richOrder", NewTopic::class.java, Supplier { + TopicBuilder.name("richOrder") + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + + applicationContext.registerBean("topic_richTrade", NewTopic::class.java, Supplier { + TopicBuilder.name("richTrade") + .partitions(10) + .replicas(3) + .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + .build() + }) + } + +} \ No newline at end of file diff --git a/websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/websocket/ports/kafka/listener/config/WebSocketKafkaConfig.kt b/websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/websocket/ports/kafka/listener/config/WebSocketKafkaConfig.kt index d7b4cd49c..dbf7e3f28 100644 --- a/websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/websocket/ports/kafka/listener/config/WebSocketKafkaConfig.kt +++ b/websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/websocket/ports/kafka/listener/config/WebSocketKafkaConfig.kt @@ -1,41 +1,38 @@ package co.nilin.opex.websocket.ports.kafka.listener.config +import co.nilin.opex.accountant.core.inout.RichOrderEvent +import co.nilin.opex.accountant.core.inout.RichTrade import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent import co.nilin.opex.websocket.ports.kafka.listener.consumer.OrderKafkaListener import co.nilin.opex.websocket.ports.kafka.listener.consumer.TradeKafkaListener -import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.common.serialization.StringSerializer import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.condition.ConditionalOnBean import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.context.support.GenericApplicationContext -import org.springframework.kafka.config.TopicBuilder -import org.springframework.kafka.core.* -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer -import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.* import org.springframework.kafka.support.serializer.JsonDeserializer -import org.springframework.kafka.support.serializer.JsonSerializer -import java.util.function.Supplier +import org.springframework.util.backoff.FixedBackOff import java.util.regex.Pattern @Configuration class WebSocketKafkaConfig { @Value("\${spring.kafka.bootstrap-servers}") - private val bootstrapServers: String? = null + private lateinit var bootstrapServers: String @Value("\${spring.kafka.consumer.group-id}") - private val groupId: String? = null + private lateinit var groupId: String - @Bean("websocketConsumerConfig") - fun consumerConfigs(): Map? { + @Bean("consumerConfigs") + fun consumerConfigs(): Map { return mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG to groupId, @@ -45,41 +42,33 @@ class WebSocketKafkaConfig { ) } - @Bean("websocketConsumerFactory") - fun consumerFactory(@Qualifier("websocketConsumerConfig") consumerConfigs: Map): ConsumerFactory { + @Bean("eventConsumerFactory") + fun consumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } - @Bean("websocketProducerConfig") - fun producerConfigs(): Map { - return mapOf( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, - ProducerConfig.ACKS_CONFIG to "all" - ) - } - - @Bean("websocketProducerFactory") - fun producerFactory(@Qualifier("websocketProducerConfig") producerConfigs: Map): ProducerFactory { - return DefaultKafkaProducerFactory(producerConfigs) + @Bean("richTradeConsumerFactory") + fun richTradeConsumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) } - @Bean("websocketKafkaTemplate") - fun kafkaTemplate(@Qualifier("websocketProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { - return KafkaTemplate(producerFactory) + @Bean("richOrderConsumerFactory") + fun richOrderConsumerFactory(@Qualifier("consumerConfigs") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) } @Autowired @ConditionalOnBean(TradeKafkaListener::class) fun configureTradeListener( tradeListener: TradeKafkaListener, - @Qualifier("websocketConsumerFactory") consumerFactory: ConsumerFactory + template: KafkaTemplate, + @Qualifier("richTradeConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("richTrade")) containerProps.messageListener = tradeListener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("WebsocketTradeKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "richTrade.DLT") container.start() } @@ -87,33 +76,23 @@ class WebSocketKafkaConfig { @ConditionalOnBean(OrderKafkaListener::class) fun configureOrderListener( orderListener: OrderKafkaListener, - @Qualifier("websocketConsumerFactory") consumerFactory: ConsumerFactory + template: KafkaTemplate, + @Qualifier("richOrderConsumerFactory") consumerFactory: ConsumerFactory ) { val containerProps = ContainerProperties(Pattern.compile("richOrder")) containerProps.messageListener = orderListener val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) container.setBeanName("WebsocketOrderKafkaListenerContainer") + container.commonErrorHandler = createConsumerErrorHandler(template, "richOrder.DLT") container.start() } - @Autowired - fun createTopics(applicationContext: GenericApplicationContext) { - applicationContext.registerBean("topic_richOrder", NewTopic::class.java, Supplier { - TopicBuilder.name("richOrder") - .partitions(10) - .replicas(3) - .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") - .build() - }) - - applicationContext.registerBean("topic_richTrade", NewTopic::class.java, Supplier { - TopicBuilder.name("richTrade") - .partitions(10) - .replicas(3) - .config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") - .build() - }) + private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler { + val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ -> + cr.headers().add("dlt-origin-module", "WEBSOCKET".toByteArray()) + TopicPartition(dltTopic, cr.partition()) + } + return DefaultErrorHandler(recoverer, FixedBackOff(5_000, 20)) } - } \ No newline at end of file