Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
506513b
remove payment from enginx config
Marchosiax Dec 19, 2021
7953fb5
Add storage module's volume
ebrahimmfadae Jan 16, 2022
71af658
Fix storage volume address
ebrahimmfadae Jan 16, 2022
5b9271a
Merge pull request #173 from opexdev/issues/147/add-storage-volume
ebrahimmfadae Jan 16, 2022
3db78a4
close #175: Fix matching-engine's trade counter
Marchosiax Jan 17, 2022
62bbf31
Merge pull request #176 from opexdev/175-trade-id
ebrahimmfadae Jan 17, 2022
4500e65
174 upgrade versions (#179)
maryarm Jan 22, 2022
745daf6
Close #171 and #177: Kafka improvement (#180)
Marchosiax Jan 29, 2022
a4ffb96
Change java version to 11 (#185)
Marchosiax Jan 29, 2022
70228ff
Merge branch 'main' of github.com:opexdev/OPEX-Core into dev
Marchosiax Feb 2, 2022
a48a94e
#189: Add Hashicorp Vault as db credential source, smtp password is a…
maryarm Feb 5, 2022
e4cba7c
#199: move vault host config from docker to main profile (#192)
maryarm Feb 5, 2022
3f19798
Close #181, Remove nginx (#194)
ebrahimmfadae Feb 7, 2022
a05ebb1
Close #195, Fix maven build issues (#196)
ebrahimmfadae Feb 8, 2022
26c1907
Security issues resolved (#197)
Marchosiax Feb 9, 2022
e6b4994
Hostfix keycloak
ebrahimmfadae Feb 14, 2022
2f5a68e
Remove application-docker.yml and application-local.yml profiles
ebrahimmfadae Feb 16, 2022
6b69b5a
Extract keycloak frontend url configs
ebrahimmfadae Feb 16, 2022
60cb500
Remove mvnw and mvnw.cmd in all modules
ebrahimmfadae Feb 16, 2022
24f8d87
Fix postgres version
ebrahimmfadae Feb 16, 2022
f3ca226
Add resources folder
ebrahimmfadae Feb 16, 2022
98ec4dc
Fix vault config
ebrahimmfadae Feb 16, 2022
8f4d20a
Fix env variables in application.yml
ebrahimmfadae Feb 16, 2022
3eed4d4
Add migration config to keycloak
ebrahimmfadae Feb 16, 2022
e0adf93
Update keycloak url configs
ebrahimmfadae Feb 16, 2022
9d0c494
Close #200, Refactor deployment configs (#206)
ebrahimmfadae Feb 16, 2022
bc5542c
Fix opex-master-realm.json path in keycloak
ebrahimmfadae Feb 16, 2022
ac32e52
Merge branch 'issues/200/refactor-deployment-configs' into dev
ebrahimmfadae Feb 16, 2022
45f742f
Fix cert-url in websocket
ebrahimmfadae Feb 16, 2022
0ece496
Remove ; from end of postgres sql commands
ebrahimmfadae Feb 19, 2022
400e3a7
Add bitcoin reserved addresses
ebrahimmfadae Feb 19, 2022
42d5e06
Close #193, Inject database credentials as envs (#209)
ebrahimmfadae Feb 20, 2022
ff8ba48
Merge branch 'main' into dev
ebrahimmfadae Feb 22, 2022
9dea538
Remove repo.spring repositories
ebrahimmfadae Feb 22, 2022
9424d14
Close #210, Add PostgreSQL backup user (#211)
ebrahimmfadae Feb 22, 2022
8182d12
Close #208, Update vault use cases (#212)
ebrahimmfadae Feb 23, 2022
9cec3a8
docker-compose: Fix DB_BACKUP_PASS envs
ebrahimmfadae Feb 23, 2022
512a1d6
maven: Enable multi-thread build option
ebrahimmfadae Feb 23, 2022
f28c1db
maven: Enable multi-thread build option in dev
ebrahimmfadae Feb 23, 2022
88074f7
auth: Fix vault secret usage issue
ebrahimmfadae Feb 23, 2022
261b408
Revert `auth: Fix vault secret usage issue`
ebrahimmfadae Feb 23, 2022
f3cffdf
maven: Fix incompatible spring version in auth
ebrahimmfadae Feb 23, 2022
955c92b
maven: Increase spring version to 2.5.5 in auth
ebrahimmfadae Feb 23, 2022
0a54aa2
maven: Update spring cloud version in auth
ebrahimmfadae Feb 23, 2022
031bedf
maven: Update spring version to 2.4.5 in auth
ebrahimmfadae Feb 23, 2022
72b7c80
Close #205: Admin services (#213)
Marchosiax Feb 26, 2022
78a4c8f
Close #161, Add referral module (#216)
ebrahimmfadae Mar 5, 2022
841a82e
Close #215: Dead letter queue (#218)
Marchosiax Mar 6, 2022
e97703c
Keycloak security and profile services (#224)
Marchosiax Mar 29, 2022
7e16006
Close #221, Implement CAPTCHA service (#225)
ebrahimmfadae Apr 4, 2022
6811c02
referral: Set swagger auth url based on environment
ebrahimmfadae Apr 5, 2022
d54d6cf
Merge pull request #226 from opexdev/implement-swagger-auth-url
ebrahimmfadae Apr 5, 2022
519e52f
Change password and OTP fixed (#228)
Marchosiax Apr 6, 2022
b033d46
Fix keycloak issues (#229)
Marchosiax Apr 13, 2022
adee8a9
auth: Add captcha to custom controllers
ebrahimmfadae Apr 13, 2022
dc5cd49
auth: Add more logs to captcha
ebrahimmfadae Apr 13, 2022
85d0da9
auth: Fix captcha service url
ebrahimmfadae Apr 13, 2022
ac30424
auth: Remove redundant error logs
ebrahimmfadae Apr 13, 2022
51fdf98
auth: Add captcha fail log
ebrahimmfadae Apr 13, 2022
4109062
auth: Add info log to validate captcha
ebrahimmfadae Apr 13, 2022
b43b475
auth: Fix xForwardedFor issue in captcha
ebrahimmfadae Apr 13, 2022
3d81788
Update Keycloak config
Marchosiax Apr 17, 2022
f9b6e2e
Merge branch 'dev' of github.com:opexdev/OPEX-Core into dev
Marchosiax Apr 17, 2022
c62f03e
Fix Jackson java 8 time issue
Marchosiax Apr 17, 2022
928175d
Fix wallet issues
Marchosiax Apr 18, 2022
62bfda9
Fix session empty agent
Marchosiax Apr 18, 2022
4adeded
Add pagination and fix session bug (#231)
Marchosiax Apr 20, 2022
18d450f
Add pagination for group members
Marchosiax Apr 20, 2022
474eab1
Release v1.0 beta (#232)
ebrahimmfadae Apr 20, 2022
7ebf700
Remove username in user event dto
Marchosiax Apr 20, 2022
5d88b11
Remove username in user registration dto
Marchosiax Apr 20, 2022
bd62a20
captcha: Fix character length
ebrahimmfadae Apr 20, 2022
aecbba2
captcha: Fix image size
ebrahimmfadae Apr 20, 2022
2456502
captcha: Fine tune image size
ebrahimmfadae Apr 20, 2022
8e42012
bc-gateway: Define test nets
ebrahimmfadae Apr 20, 2022
8b2b2cd
bc-gateway: Fix defining ids
ebrahimmfadae Apr 20, 2022
2555b64
wallet: Update currency gift configuration (#233)
ebrahimmfadae Apr 20, 2022
adb6d55
Fix test asset declarations
ebrahimmfadae Apr 20, 2022
de02ed3
Increased buffer size for web client
Marchosiax Apr 23, 2022
994aebc
bc-gateway: Define test bitcoin address type
ebrahimmfadae Apr 24, 2022
6d598ee
vault: Fix backup database secret
ebrahimmfadae Apr 24, 2022
3884968
bc-gateway: Fix data.sql invalid syntax
ebrahimmfadae Apr 24, 2022
6479e71
Admin Improvements (#234)
Marchosiax Apr 24, 2022
44db5e5
vault: Add VANDAR_API_KEY env
ebrahimmfadae Apr 25, 2022
a4110cc
Fix assigned address not found when memo is empty
Marchosiax Apr 25, 2022
e538fc4
Merge remote-tracking branch 'origin/dev' into dev
Marchosiax Apr 25, 2022
b7d4627
Changed broken assign address query
Marchosiax Apr 25, 2022
3479828
bc-gateway: Fix USDT on ropsten
ebrahimmfadae Apr 25, 2022
30a7264
Lower case when assigning address
Marchosiax Apr 25, 2022
1e0ee1d
Merge remote-tracking branch 'origin/dev' into dev
Marchosiax Apr 25, 2022
f6ff856
Add group to user details for admin
Marchosiax Apr 25, 2022
e18bed6
Removed lowercase for assigning address
Marchosiax Apr 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
8 changes: 7 additions & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pipeline {
withMaven(
maven: 'maven-3.6.3'
) {
sh 'mvn -B clean install'
sh 'mvn -T 1C -B clean install'
}
}
}
Expand All @@ -20,8 +20,14 @@ pipeline {
SMTP_PASS = credentials("smtp-secret")
DB_USER = 'opex'
DB_PASS = credentials("db-secret")
DB_BACKUP_USER = 'opex_backup'
DB_BACKUP_PASS = credentials("db-backup-secret")
KEYCLOAK_ADMIN_URL = 'https://demo.opex.dev/auth'
KEYCLOAK_FRONTEND_URL = 'https://demo.opex.dev/auth'
KEYCLOAK_ADMIN_USERNAME = credentials("keycloak-admin-username")
KEYCLOAK_ADMIN_PASSWORD = credentials("keycloak-admin-password")
OPEX_ADMIN_KEYCLOAK_CLIENT_SECRET = credentials("opex-admin-keycloak-client-secret")
VANDAR_API_KEY = credentials("vandar-api-key")
COMPOSE_PROJECT_NAME = 'demo-core'
DEFAULT_NETWORK_NAME = 'demo-opex'
}
Expand Down
3 changes: 0 additions & 3 deletions accountant/accountant-app/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ spring:
bootstrap-servers: ${KAFKA_IP_PORT:localhost:9092}
consumer:
group-id: accountant
redis:
hostname: ${REDIS_HOST:localhost}
port: 6379
r2dbc:
url: r2dbc:postgresql://${DB_IP_PORT:localhost}/opex_accountant
username: ${dbusername:opex}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ class PairConfig(
val leftSideWalletSymbol: String, //can be same as pair left side
val rightSideWalletSymbol: String, //can be same as pair right side
val leftSideFraction: Double,
val rightSideFraction: Double
val rightSideFraction: Double,
val rate: Double = 0.0
)
Original file line number Diff line number Diff line change
@@ -1,39 +1,38 @@
package co.nilin.opex.accountant.ports.kafka.listener.config


import co.nilin.opex.accountant.ports.kafka.listener.consumer.EventKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.OrderKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.TempEventKafkaListener
import co.nilin.opex.accountant.ports.kafka.listener.consumer.TradeKafkaListener
import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.support.GenericApplicationContext
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.*
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.util.backoff.FixedBackOff
import java.util.regex.Pattern

@Configuration
class AccountantKafkaConfig {

@Value("\${spring.kafka.bootstrap-servers}")
private val bootstrapServers: String? = null
private lateinit var bootstrapServers: String

@Value("\${spring.kafka.consumer.group-id}")
private val groupId: String? = null
private lateinit var groupId: String

@Bean("accountantConsumerConfig")
fun consumerConfigs(): Map<String, Any?> {
@Bean("consumerConfig")
fun consumerConfigs(): Map<String, Any> {
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG to groupId,
Expand All @@ -45,66 +44,76 @@ class AccountantKafkaConfig {
}

@Bean("accountantConsumerFactory")
fun consumerFactory(@Qualifier("accountantConsumerConfig") consumerConfigs: Map<String, Any?>): ConsumerFactory<String, CoreEvent> {
fun consumerFactory(@Qualifier("consumerConfig") consumerConfigs: Map<String, Any?>): ConsumerFactory<String, CoreEvent> {
return DefaultKafkaConsumerFactory(consumerConfigs)
}

@Autowired
@ConditionalOnBean(TradeKafkaListener::class)
fun configureTradeListener(
tradeListener: TradeKafkaListener,
@Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate<String?, CoreEvent>,
@Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory<String, CoreEvent>
) {
val containerProps = ContainerProperties(Pattern.compile("trades_.*"))
containerProps.messageListener = tradeListener
val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps)
container.setBeanName("TradeKafkaListenerContainer")
container.commonErrorHandler = createConsumerErrorHandler(template, "trades.DLT")
container.start()
}

@Autowired
@ConditionalOnBean(EventKafkaListener::class)
fun configureEventListener(
eventListener: EventKafkaListener,
@Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate<String?, CoreEvent>,
@Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory<String, CoreEvent>
) {
val containerProps = ContainerProperties(Pattern.compile("events_.*"))
containerProps.messageListener = eventListener
val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps)
container.setBeanName("EventKafkaListenerContainer")
container.commonErrorHandler = createConsumerErrorHandler(template, "events.DLT")
container.start()
}

@Autowired
@ConditionalOnBean(OrderKafkaListener::class)
fun configureOrderListener(
orderListener: OrderKafkaListener,
@Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate<String?, CoreEvent>,
@Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory<String, CoreEvent>
) {
val containerProps = ContainerProperties(Pattern.compile("orders_.*"))
containerProps.messageListener = orderListener
val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps)
container.setBeanName("OrderKafkaListenerContainer")
container.commonErrorHandler = createConsumerErrorHandler(template, "orders.DLT")
container.start()
}

@Autowired
fun createTempTopics(applicationContext: GenericApplicationContext) {
applicationContext.registerBean("topic_tempevents", NewTopic::class.java, "tempevents", 1, 1)
}

@Autowired
@ConditionalOnBean(TempEventKafkaListener::class)
fun configureTempEventListener(
eventListener: TempEventKafkaListener,
@Qualifier("accountantEventKafkaTemplate") template: KafkaTemplate<String?, CoreEvent>,
@Qualifier("accountantConsumerFactory") consumerFactory: ConsumerFactory<String, CoreEvent>
) {
val containerProps = ContainerProperties(Pattern.compile("tempevents"))
containerProps.messageListener = eventListener
val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps)
container.setBeanName("TempEventKafkaListenerContainer")
container.commonErrorHandler = createConsumerErrorHandler(template, "tempevents.DLT")
container.start()
}

private fun createConsumerErrorHandler(kafkaTemplate: KafkaTemplate<*, *>, dltTopic: String): CommonErrorHandler {
val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { cr, _ ->
cr.headers().add("dlt-origin-module", "ACCOUNTANT".toByteArray())
TopicPartition(dltTopic, cr.partition())
}
return DefaultErrorHandler(recoverer, FixedBackOff(5_000, 20))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,4 @@
<scope>test</scope>
</dependency>
</dependencies>

<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ VALUES ('btc_usdt', 'btc', 'usdt', 0.000001, 0.01, 55000),
('nln_btc', 'nln', 'btc', 1.0, 0.000001, 1 / 5500000)
ON CONFLICT DO NOTHING;

-- Test pairs
INSERT INTO pair_config
VALUES ('tbtc_tusdt', 'tbtc', 'tusdt', 0.000001, 0.01, 55000),
('teth_tusdt', 'teth', 'tusdt', 0.00001, 0.01, 3800),
('nln_tusdt', 'nln', 'tusdt', 1.0, 0.01, 0.01),
('nln_tbtc', 'nln', 'tbtc', 1.0, 0.000001, 1 / 5500000)
ON CONFLICT DO NOTHING;

INSERT INTO pair_fee_config
VALUES (1, 'btc_usdt', 'ASK', '*', 0.01, 0.01),
(2, 'btc_usdt', 'BID', '*', 0.01, 0.01),
Expand All @@ -16,6 +24,18 @@ VALUES (1, 'btc_usdt', 'ASK', '*', 0.01, 0.01),
(8, 'eth_usdt', 'BID', '*', 0.01, 0.01)
ON CONFLICT DO NOTHING;

-- Test pair configs
INSERT INTO pair_fee_config
VALUES (9, 'tbtc_tusdt', 'ASK', '*', 0.01, 0.01),
(10, 'tbtc_tusdt', 'BID', '*', 0.01, 0.01),
(11, 'nln_tusdt', 'ASK', '*', 0.01, 0.01),
(12, 'nln_tusdt', 'BID', '*', 0.01, 0.01),
(13, 'nln_tbtc', 'ASK', '*', 0.01, 0.01),
(14, 'nln_tbtc', 'BID', '*', 0.01, 0.01),
(15, 'teth_tusdt', 'ASK', '*', 0.01, 0.01),
(16, 'teth_tusdt', 'BID', '*', 0.01, 0.01)
ON CONFLICT DO NOTHING;

SELECT setval(pg_get_serial_sequence('pair_fee_config', 'id'), (SELECT MAX(id) FROM pair_fee_config));

COMMIT;
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,4 @@
<scope>test</scope>
</dependency>
</dependencies>

<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package co.nilin.opex.accountant.ports.kafka.submitter.config

import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.config.TopicConfig
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Configuration
import org.springframework.context.support.GenericApplicationContext
import org.springframework.kafka.config.TopicBuilder
import java.util.function.Supplier

@Configuration
class KafkaTopicConfig {

@Autowired
fun createTopics(applicationContext: GenericApplicationContext) {
with(applicationContext) {
registerBean("topic_richOrder", NewTopic::class.java, Supplier {
TopicBuilder.name("richOrder")
.partitions(10)
.replicas(3)
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
.build()
})

registerBean("topic_richTrade", NewTopic::class.java, Supplier {
TopicBuilder.name("richTrade")
.partitions(10)
.replicas(3)
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
.build()
})

registerBean("topic_tempevents", NewTopic::class.java, "tempevents", 1, 1)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,24 @@ package co.nilin.opex.accountant.ports.kafka.submitter.config
import co.nilin.opex.accountant.core.inout.RichOrderEvent
import co.nilin.opex.accountant.core.inout.RichTrade
import co.nilin.opex.matching.engine.core.eventh.events.CoreEvent
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.support.GenericApplicationContext
import org.springframework.kafka.config.TopicBuilder
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
import org.springframework.kafka.support.serializer.JsonSerializer
import java.util.function.Supplier

@Configuration
class SubmitterKafkaConfig {

@Value("\${spring.kafka.bootstrap-servers}")
private lateinit var bootstrapServers: String

@Bean("accountantProducerConfigs")
@Bean("producerConfigs")
fun producerConfigs(): Map<String, Any> {
return mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
Expand All @@ -38,7 +32,7 @@ class SubmitterKafkaConfig {
}

@Bean("accountantEventProducerFactory")
fun producerFactory(@Qualifier("accountantProducerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, CoreEvent> {
fun producerFactory(@Qualifier("producerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, CoreEvent> {
return DefaultKafkaProducerFactory(producerConfigs)
}

Expand All @@ -48,7 +42,7 @@ class SubmitterKafkaConfig {
}

@Bean("richTradeProducerFactory")
fun richTradeProducerFactory(@Qualifier("accountantProducerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, RichTrade> {
fun richTradeProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, RichTrade> {
return DefaultKafkaProducerFactory(producerConfigs)
}

Expand All @@ -58,31 +52,12 @@ class SubmitterKafkaConfig {
}

@Bean("richOrderProducerFactory")
fun richOrderProducerFactory(@Qualifier("accountantProducerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, RichOrderEvent> {
fun richOrderProducerFactory(@Qualifier("producerConfigs") producerConfigs: Map<String, Any>): ProducerFactory<String?, RichOrderEvent> {
return DefaultKafkaProducerFactory(producerConfigs)
}

@Bean("richOrderKafkaTemplate")
fun richOrderKafkaTemplate(@Qualifier("richOrderProducerFactory") producerFactory: ProducerFactory<String?, RichOrderEvent>): KafkaTemplate<String?, RichOrderEvent> {
return KafkaTemplate(producerFactory)
}

@Autowired
fun createTopics(applicationContext: GenericApplicationContext) {
applicationContext.registerBean("topic_richOrder", NewTopic::class.java, Supplier {
TopicBuilder.name("richOrder")
.partitions(10)
.replicas(3)
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
.build()
})

applicationContext.registerBean("topic_richTrade", NewTopic::class.java, Supplier {
TopicBuilder.name("richTrade")
.partitions(10)
.replicas(3)
.config(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
.build()
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package co.nilin.opex.accountant.ports.kafka.submitter.service

import co.nilin.opex.accountant.core.inout.RichOrderEvent
import co.nilin.opex.accountant.core.spi.RichOrderPublisher
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
Expand All @@ -13,13 +14,17 @@ import kotlin.coroutines.suspendCoroutine
class RichOrderSubmitter(@Qualifier("richOrderKafkaTemplate") val kafkaTemplate: KafkaTemplate<String, RichOrderEvent>) :
RichOrderPublisher {

private val logger = LoggerFactory.getLogger(RichOrderSubmitter::class.java)

override suspend fun publish(order: RichOrderEvent): Unit = suspendCoroutine { cont ->
println("richOrderSubmit!")
logger.info("Submitting RichOrder")

val sendFuture = kafkaTemplate.send("richOrder", order)
sendFuture.addCallback({ sendResult ->
sendFuture.addCallback({
cont.resume(Unit)
}, { exception ->
cont.resumeWithException(exception)
}, {
logger.info("Error submitting RichOrder", it)
cont.resumeWithException(it)
})
}
}
Loading