Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,39 +1,38 @@
package co.nilin.opex.accountant.ports.kafka.listener.config


import co.nilin.opex.accountant.ports.kafka.listener.consumer.EventKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.OrderKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.TempEventKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.TradeKafkaListener
import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.support.GenericApplicationContext
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.*
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.util.backoff.FixedBackOff
import java.util.regex.Pattern

@Configuration
class AccountantKafkaConfig {

@Value("\${spring.kafka.bootstrap-servers}")
private val bootstrapServers: String? = null
private lateinit var bootstrapServers: String

@Value("\${spring.kafka.consumer.group-id}")
private val groupId: String? = null
private lateinit var groupId: String

@Bean("accountantConsumerConfig")
fun consumerConfigs(): Map<String, Any?> {
@Bean("consumerConfig")
fun consumerConfigs(): Map<String, Any> {
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG to groupId,
Expand All @@ -45,66 +44,76 @@ class AccountantKafkaConfig {
}

@Bean("accountantConsumerFactory")
fun consumerFactory(@Qualifier("accountantConsumerConfig") consumerConfigs: Map<String, Any?>): ConsumerFactory<String, CoreEvent> {
fun consumerFactory(@Qualifier("consumerConfig") consumerConfigs: Map<String, Any?>): ConsumerFactory<String, CoreEvent> {
return DefaultKafkaConsumerFactory(consumerConfigs)
}

@Autowired
@ConditionalOnBean(TradeKafkaListener::class)
fun configureTradeListener(
tradeListener: TradeKafkaListener,
@Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate<String?, CoreEvent>,
@Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory<String, CoreEvent>
) {
val containerProps = ContainerProperties(Pattern.compile("trades_.*"))
containerProps.messageListener = tradeListener
val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps)
container.setBeanName("TradeKafkaListenerContainer")
container.commonErrorHandler = createConsumerErrorHandler(template, "trades.DLT")
container.start()
}

@Autowired
@ConditionalOnBean(EventKafkaListener::class)
fun configureEventListener(
eventListener: EventKafkaListener,
@Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate<String?, CoreEvent>,
@Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory<String, CoreEvent>
) {
val containerProps = ContainerProperties(Pattern.compile("events_.*"))
containerProps.messageListener = eventListener
val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps)
container.setBeanName("EventKafkaListenerContainer")
container.commonErrorHandler = createConsumerErrorHandler(template, "events.DLT")
container.start()
}

@Autowired
@ConditionalOnBean(OrderKafkaListener::class)
fun configureOrderListener(
orderListener: OrderKafkaListener,
@Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate<String?, CoreEvent>,
@Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory<String, CoreEvent>
) {
val containerProps = ContainerProperties(Pattern.compile("orders_.*"))
containerProps.messageListener = orderListener
val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps)
container.setBeanName("OrderKafkaListenerContainer")
container.commonErrorHandler = createConsumerErrorHandler(template, "orders.DLT")
container.start()
}

@Autowired
fun createTempTopics(applicationContext: GenericApplicationContext) {
applicationContext.registerBean("topic_tempevents", NewTopic::class.java, "tempevents", 1, 1)
}

@Autowired
@ConditionalOnBean(TempEventKafkaListener::class)
fun configureTempEventListener(
eventListener: TempEventKafkaListener,
@Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate<String?, CoreEvent>,
@Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory<String, CoreEvent>
) {
val containerProps = ContainerProperties(Pattern.compile("tempevents"))
containerProps.messageListener = eventListener
val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps)
container.setBeanName("TempEventKafkaListenerContainer")
container.commonErrorHandler = createConsumerErrorHandler(template, "tempevents.DLT")
container.start()
}

private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler {
val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ ->
cr.headers().add("dlt-origin-module", "ACCOUNTANT".toByteArray())
TopicPartition(dltTopic, cr.partition())
}
return DefaultErrorHandler(recoverer, FixedBackOff(5_000, 20))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package co.nilin.opex.accountant.ports.kafka.submitter.config

import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.config.TopicConfig
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Configuration
import org.springframework.context.support.GenericApplicationContext
import org.springframework.kafka.config.TopicBuilder
import java.util.function.Supplier

@Configuration
class KafkaTopicConfig {

@Autowired
fun createTopics(applicationContext: GenericApplicationContext) {
with(applicationContext) {
registerBean("topic_richOrder", NewTopic::class.java, Supplier {
TopicBuilder.name("richOrder")
.partitions(10)
.replicas(3)
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
.build()
})

registerBean("topic_richTrade", NewTopic::class.java, Supplier {
TopicBuilder.name("richTrade")
.partitions(10)
.replicas(3)
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
.build()
})

registerBean("topic_tempevents", NewTopic::class.java, "tempevents", 1, 1)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,24 @@ package co.nilin.opex.accountant.ports.kafka.submitter.config
import co.nilin.opex.accountant.core.inout.RichOrderEvent
import co.nilin.opex.accountant.core.inout.RichTrade
import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.support.GenericApplicationContext
import org.springframework.kafka.config.TopicBuilder
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.support.serializer.JsonSerializer
import java.util.function.Supplier

@Configuration
class SubmitterKafkaConfig {

@Value("\${spring.kafka.bootstrap-servers}")
private lateinit var bootstrapServers: String

@Bean("accountantProducerConfigs")
@Bean("producerConfigs")
fun producerConfigs(): Map<String, Any> {
return mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
Expand All @@ -38,7 +32,7 @@ class SubmitterKafkaConfig {
}

@Bean("accountantEventProducerFactory")
fun producerFactory(@Qualifier("accountantProducerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, CoreEvent> {
fun producerFactory(@Qualifier("producerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, CoreEvent> {
return DefaultKafkaProducerFactory(producerConfigs)
}

Expand All @@ -48,7 +42,7 @@ class SubmitterKafkaConfig {
}

@Bean("richTradeProducerFactory")
fun richTradeProducerFactory(@Qualifier("accountantProducerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, RichTrade> {
fun richTradeProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, RichTrade> {
return DefaultKafkaProducerFactory(producerConfigs)
}

Expand All @@ -58,31 +52,12 @@ class SubmitterKafkaConfig {
}

@Bean("richOrderProducerFactory")
fun richOrderProducerFactory(@Qualifier("accountantProducerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, RichOrderEvent> {
fun richOrderProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, RichOrderEvent> {
return DefaultKafkaProducerFactory(producerConfigs)
}

@Bean("richOrderKafkaTemplate")
fun richOrderKafkaTemplate(@Qualifier("richOrderProducerFactory") producerFactory: ProducerFactory<String?, RichOrderEvent>): KafkaTemplate<String?, RichOrderEvent> {
return KafkaTemplate(producerFactory)
}

@Autowired
fun createTopics(applicationContext: GenericApplicationContext) {
applicationContext.registerBean("topic_richOrder", NewTopic::class.java, Supplier {
TopicBuilder.name("richOrder")
.partitions(10)
.replicas(3)
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
.build()
})

applicationContext.registerBean("topic_richTrade", NewTopic::class.java, Supplier {
TopicBuilder.name("richTrade")
.partitions(10)
.replicas(3)
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
.build()
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.submitter.service

import co.nilin.opex.accountant.core.inout.RichOrderEvent
import co.nilin.opex.accountant.core.spi.RichOrderPublisher
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
Expand All @@ -13,13 +14,17 @@ import kotlin.coroutines.suspendCoroutine
class RichOrderSubmitter(@Qualifier("richOrderKafkaTemplate") val kafkaTemplate: KafkaTemplate<String, RichOrderEvent>) :
RichOrderPublisher {

private val logger = LoggerFactory.getLogger(RichOrderSubmitter::class.java)

override suspend fun publish(order: RichOrderEvent): Unit = suspendCoroutine { cont ->
println("richOrderSubmit!")
logger.info("Submitting RichOrder")

val sendFuture = kafkaTemplate.send("richOrder", order)
sendFuture.addCallback({ sendResult ->
sendFuture.addCallback({
cont.resume(Unit)
}, { exception ->
cont.resumeWithException(exception)
}, {
logger.info("Error submitting RichOrder", it)
cont.resumeWithException(it)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.submitter.service

import co.nilin.opex.accountant.core.inout.RichTrade
import co.nilin.opex.accountant.core.spi.RichTradePublisher
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
Expand All @@ -12,13 +13,18 @@ import kotlin.coroutines.suspendCoroutine
@Component
class RichTradeSubmitter(@Qualifier("richTradeKafkaTemplate") val kafkaTemplate: KafkaTemplate<String, RichTrade>) :
RichTradePublisher {

private val logger = LoggerFactory.getLogger(RichTradeSubmitter::class.java)

override suspend fun publish(trade: RichTrade): Unit = suspendCoroutine { cont ->
println("richTradeSubmit!")
logger.info("Submitting RichTrade event: id=${trade.id}")

val sendFuture = kafkaTemplate.send("richTrade", trade)
sendFuture.addCallback({ sendResult ->
sendFuture.addCallback({
cont.resume(Unit)
}, { exception ->
cont.resumeWithException(exception)
}, {
logger.error("RichTrade submitter error", it)
cont.resumeWithException(it)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.submitter.service

import co.nilin.opex.accountant.core.spi.TempEventRepublisher
import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
Expand All @@ -12,14 +13,19 @@ import kotlin.coroutines.suspendCoroutine
@Component
class TempEventSubmitter(@Qualifier("accountantEventKafkaTemplate") val kafkaTemplate: KafkaTemplate<String, CoreEvent>) :
TempEventRepublisher {

private val logger = LoggerFactory.getLogger(TempEventSubmitter::class.java)

override suspend fun republish(events: List<CoreEvent>): Unit = suspendCoroutine { cont ->
println("accountantEventSubmit!")
logger.info("Submitting TempEvents")

events.forEach { event ->
val sendFuture = kafkaTemplate.send("tempevents", event)
sendFuture.addCallback({ sendResult ->
sendFuture.addCallback({
cont.resume(Unit)
}, { exception ->
cont.resumeWithException(exception)
}, {
logger.error("Error submitting TempEvents", it)
cont.resumeWithException(it)
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ class AdminKafkaEventPublisher(private val kafkaTemplate: KafkaTemplate<String?,

override suspend fun publish(event: AdminEvent): Unit = suspendCoroutine { cont ->
logger.info("Publishing admin event: $event")

val sendFuture = kafkaTemplate.send("admin_event", event)
sendFuture.addCallback({ cont.resume(Unit) }, { cont.resumeWithException(it) })
sendFuture.addCallback({
cont.resume(Unit)
}, {
logger.error("Error publishing admin event", it)
cont.resumeWithException(it)
})
}

}
Loading