diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 3b310f08..38080b60 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -1,4 +1,4 @@
-name: Maven Build Artifact
+name: Build Maven Artifact
on:
pull_request:
@@ -7,4 +7,4 @@ on:
jobs:
build:
- uses: valitydev/java-workflow/.github/workflows/maven-service-build.yml@v1
+ uses: valitydev/base-workflow/.github/workflows/maven-service-build.yml@v2-beta
diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml
index 5a405cb6..d464b920 100644
--- a/.github/workflows/deploy.yml
+++ b/.github/workflows/deploy.yml
@@ -1,4 +1,4 @@
-name: Maven Deploy Artifact
+name: Deploy Docker Image
on:
push:
@@ -7,13 +7,9 @@ on:
- 'main'
- 'epic/**'
-env:
- REGISTRY: ghcr.io
- IMAGE_NAME: ${{ github.repository }}
-
jobs:
- deploy:
- uses: valitydev/java-workflow/.github/workflows/maven-service-deploy.yml@v1
+ build-and-deploy:
+ uses: valitydev/base-workflow/.github/workflows/maven-service-deploy.yml@v2-beta
secrets:
github-token: ${{ secrets.GITHUB_TOKEN }}
mm-webhook-url: ${{ secrets.MATTERMOST_WEBHOOK_URL }}
diff --git a/pom.xml b/pom.xml
index 5ec32881..c4ab43a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
dev.vality
service-parent-pom
- 1.0.18
+ 2.0.0-BETA-11
newway
@@ -177,6 +177,11 @@
limiter-proto
1.32-6158184
+
+ dev.vality
+ exrates-proto
+ 1.3-875328b
+
dev.vality
shared-resources
@@ -202,6 +207,12 @@
1.4.1
test
+
+ org.awaitility
+ awaitility
+ 4.2.0
+ test
+
diff --git a/src/main/java/dev/vality/newway/config/KafkaConfig.java b/src/main/java/dev/vality/newway/config/KafkaConfig.java
index 9f602da0..df10d067 100644
--- a/src/main/java/dev/vality/newway/config/KafkaConfig.java
+++ b/src/main/java/dev/vality/newway/config/KafkaConfig.java
@@ -1,8 +1,10 @@
package dev.vality.newway.config;
+import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.newway.config.properties.KafkaConsumerProperties;
+import dev.vality.newway.serde.CurrencyExchangeRateEventDeserializer;
import dev.vality.newway.serde.PayoutEventDeserializer;
import dev.vality.newway.serde.SinkEventDeserializer;
import dev.vality.payout.manager.Event;
@@ -33,6 +35,9 @@ public class KafkaConfig {
@Value("${kafka.topics.party-management.consumer.group-id}")
private String partyConsumerGroup;
+ @Value("${kafka.topics.exrate.consumer.group-id}")
+ private String exrateConsumerGroup;
+
@Bean
public Map consumerConfigs() {
return createConsumerConfig();
@@ -136,9 +141,20 @@ public KafkaListenerContainerFactory> createConcurrentFactory(
- ConsumerFactory consumerFactory, int threadsNumber) {
- ConcurrentKafkaListenerContainerFactory factory =
+ @Bean
+ public KafkaListenerContainerFactory> exchangeRateContainerFactory() {
+ Map props = kafkaProperties.buildConsumerProperties();
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CurrencyExchangeRateEventDeserializer.class);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, exrateConsumerGroup);
+ ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(props);
+
+ return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getExrateConcurrency());
+ }
+
+ private KafkaListenerContainerFactory> createConcurrentFactory(
+ ConsumerFactory consumerFactory, int threadsNumber) {
+ ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
initFactory(consumerFactory, threadsNumber, factory);
return factory;
diff --git a/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java b/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java
index a4dab743..b6536a6f 100644
--- a/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java
+++ b/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java
@@ -25,5 +25,6 @@ public class KafkaConsumerProperties {
private int destinationConcurrency;
private int withdrawalSessionConcurrency;
private int limitConfigConcurrency;
+ private int exrateConcurrency;
}
diff --git a/src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java b/src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java
new file mode 100644
index 00000000..19bda164
--- /dev/null
+++ b/src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java
@@ -0,0 +1,12 @@
+package dev.vality.newway.dao.exrate.iface;
+
+import dev.vality.dao.DaoException;
+import dev.vality.dao.GenericDao;
+import dev.vality.newway.domain.tables.pojos.ExRate;
+
+import java.util.List;
+
+public interface ExchangeRateDao extends GenericDao {
+ void saveBatch(List exchangeRates) throws DaoException;
+ ExRate findBySourceSymbolicCode(String symbolicCode);
+}
diff --git a/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java b/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java
new file mode 100644
index 00000000..22474563
--- /dev/null
+++ b/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java
@@ -0,0 +1,48 @@
+package dev.vality.newway.dao.exrate.impl;
+
+import dev.vality.dao.DaoException;
+import dev.vality.dao.impl.AbstractGenericDao;
+import dev.vality.mapper.RecordRowMapper;
+import dev.vality.newway.dao.exrate.iface.ExchangeRateDao;
+import dev.vality.newway.domain.tables.pojos.ExRate;
+import org.jooq.Query;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.stereotype.Component;
+
+import javax.sql.DataSource;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static dev.vality.newway.domain.tables.ExRate.EX_RATE;
+
+@Component
+public class ExchangeRateDaoImpl extends AbstractGenericDao implements ExchangeRateDao {
+
+ private final RowMapper rowMapper;
+
+ @Autowired
+ public ExchangeRateDaoImpl(@Qualifier("dataSource") DataSource dataSource) {
+ super(dataSource);
+ this.rowMapper = new RecordRowMapper<>(EX_RATE, ExRate.class);
+ }
+
+ @Override
+ public void saveBatch(List exchangeRates) throws DaoException {
+ List queryList = exchangeRates.stream()
+ .map(exrate -> getDslContext().newRecord(EX_RATE, exrate))
+ .map(record -> (Query) getDslContext().insertInto(EX_RATE).set(record)
+ .onConflict(EX_RATE.EVENT_ID)
+ .doNothing())
+ .collect(Collectors.toList());
+ batchExecute(queryList);
+ }
+
+ @Override
+ public ExRate findBySourceSymbolicCode(String symbolicCode) {
+ Query query = getDslContext().selectFrom(EX_RATE)
+ .where(EX_RATE.SOURCE_CURRENCY_SYMBOLIC_CODE.eq(symbolicCode));
+ return fetchOne(query, rowMapper);
+ }
+}
diff --git a/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java
new file mode 100644
index 00000000..3adec16d
--- /dev/null
+++ b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java
@@ -0,0 +1,47 @@
+package dev.vality.newway.handler.event.stock.impl.exrate;
+
+import dev.vality.exrates.events.CurrencyEvent;
+import dev.vality.exrates.events.CurrencyExchangeRate;
+import dev.vality.geck.common.util.TypeUtil;
+import dev.vality.newway.dao.exrate.iface.ExchangeRateDao;
+import dev.vality.newway.domain.tables.pojos.ExRate;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class CurrencyExchangeRateHandler implements ExchangeRateHandler {
+
+ private final ExchangeRateDao exchangeRateDao;
+
+ @Override
+ public void handle(List events) {
+ List exrates = events.stream().map(currencyEvent -> {
+ CurrencyExchangeRate exchangeRate = currencyEvent.getPayload().getExchangeRate();
+ ExRate exrate = new ExRate();
+ exrate.setEventId(UUID.fromString(currencyEvent.getEventId()));
+ exrate.setEventCreatedAt(TypeUtil.stringToLocalDateTime(currencyEvent.getEventCreatedAt()));
+ exrate.setSourceCurrencySymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode());
+ exrate.setSourceCurrencyExponent(exchangeRate.getSourceCurrency().getExponent());
+ exrate.setDestinationCurrencySymbolicCode(exchangeRate.getDestinationCurrency().getSymbolicCode());
+ exrate.setDestinationCurrencyExponent(exchangeRate.getDestinationCurrency().getExponent());
+ exrate.setRationalP(exchangeRate.getExchangeRate().getP());
+ exrate.setRationalQ(exchangeRate.getExchangeRate().getQ());
+ exrate.setRateTimestamp(TypeUtil.stringToLocalDateTime(exchangeRate.timestamp));
+ return exrate;
+ }).collect(Collectors.toList());
+
+ exchangeRateDao.saveBatch(exrates);
+ }
+
+ @Override
+ public boolean isHandle(CurrencyEvent currencyEvent) {
+ return currencyEvent.payload.isSetExchangeRate();
+ }
+}
diff --git a/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java
new file mode 100644
index 00000000..006df3ae
--- /dev/null
+++ b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java
@@ -0,0 +1,12 @@
+package dev.vality.newway.handler.event.stock.impl.exrate;
+
+import dev.vality.exrates.events.CurrencyEvent;
+
+import java.util.List;
+
+public interface ExchangeRateHandler {
+
+ void handle(List currencyEvents);
+
+ boolean isHandle(CurrencyEvent currencyEvent);
+}
diff --git a/src/main/java/dev/vality/newway/listener/ExchangeRateListener.java b/src/main/java/dev/vality/newway/listener/ExchangeRateListener.java
new file mode 100644
index 00000000..7fb72020
--- /dev/null
+++ b/src/main/java/dev/vality/newway/listener/ExchangeRateListener.java
@@ -0,0 +1,36 @@
+package dev.vality.newway.listener;
+
+import dev.vality.exrates.events.CurrencyEvent;
+import dev.vality.newway.service.ExchangeRateService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+@RequiredArgsConstructor
+@Service
+public class ExchangeRateListener {
+
+ private final ExchangeRateService exchangeRateService;
+
+ @KafkaListener(
+ autoStartup = "${kafka.topics.exrate.enabled}",
+ topics = "${kafka.topics.exrate.id}",
+ containerFactory = "exchangeRateContainerFactory")
+ public void handle(List> messages, Acknowledgment ack) {
+ log.info("Got ExchangeRate messages batch with size: {}", messages.size());
+ exchangeRateService.handleEvents(
+ messages.stream()
+ .map(ConsumerRecord::value)
+ .collect(Collectors.toList())
+ );
+ ack.acknowledge();
+ log.info("Batch ExchangeRate has been committed, size={}", messages.size());
+ }
+}
diff --git a/src/main/java/dev/vality/newway/serde/CurrencyExchangeRateEventDeserializer.java b/src/main/java/dev/vality/newway/serde/CurrencyExchangeRateEventDeserializer.java
new file mode 100644
index 00000000..53096d9b
--- /dev/null
+++ b/src/main/java/dev/vality/newway/serde/CurrencyExchangeRateEventDeserializer.java
@@ -0,0 +1,11 @@
+package dev.vality.newway.serde;
+
+import dev.vality.exrates.events.CurrencyEvent;
+import dev.vality.kafka.common.serialization.AbstractThriftDeserializer;
+
+public class CurrencyExchangeRateEventDeserializer extends AbstractThriftDeserializer {
+ @Override
+ public CurrencyEvent deserialize(String topic, byte[] data) {
+ return deserialize(data, new CurrencyEvent());
+ }
+}
diff --git a/src/main/java/dev/vality/newway/service/ExchangeRateService.java b/src/main/java/dev/vality/newway/service/ExchangeRateService.java
new file mode 100644
index 00000000..1f9153ab
--- /dev/null
+++ b/src/main/java/dev/vality/newway/service/ExchangeRateService.java
@@ -0,0 +1,29 @@
+package dev.vality.newway.service;
+
+import dev.vality.exrates.events.CurrencyEvent;
+import dev.vality.newway.handler.event.stock.impl.exrate.ExchangeRateHandler;
+import lombok.RequiredArgsConstructor;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Service
+@RequiredArgsConstructor
+public class ExchangeRateService {
+
+ private final List exchangeRateHandlers;
+
+ @Transactional(propagation = Propagation.REQUIRED)
+ public void handleEvents(List events) {
+ events.stream()
+ .collect(Collectors.groupingBy(
+ currencyEvent -> exchangeRateHandlers.stream()
+ .filter(exchangeRateHandler -> exchangeRateHandler.isHandle(currencyEvent))
+ .findAny().orElseThrow())
+ ).forEach(ExchangeRateHandler::handle);
+ }
+
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 1ac43272..545201fc 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -67,6 +67,7 @@ kafka:
destination-concurrency: 7
withdrawal-session-concurrency: 7
limit-config-concurrency: 7
+ exrate-concurrency: 7
topics:
invoice:
id: mg-invoice-100-2
@@ -108,6 +109,10 @@ kafka:
limit-config:
id: mg-events-lim-config
enabled: false
+ exrate:
+ id: etl-exchange-rate
+ enabled: false
+ consumer.group-id: "daway-exrate"
dmt:
url: http://dominant:8022/v1/domain/repository
diff --git a/src/main/resources/db/migration/V7__add_new_exchange_rates.sql b/src/main/resources/db/migration/V7__add_new_exchange_rates.sql
new file mode 100644
index 00000000..4a06012c
--- /dev/null
+++ b/src/main/resources/db/migration/V7__add_new_exchange_rates.sql
@@ -0,0 +1,19 @@
+CREATE TABLE dw.ex_rate
+(
+ id BIGSERIAL NOT NULL,
+ event_id uuid UNIQUE NOT NULL,
+ event_created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ source_currency_symbolic_code CHARACTER VARYING NOT NULL,
+ source_currency_exponent SMALLINT NOT NULL,
+ destination_currency_symbolic_code CHARACTER VARYING NOT NULL,
+ destination_currency_exponent SMALLINT NOT NULL,
+ rational_p BIGINT NOT NULL,
+ rational_q BIGINT NOT NULL,
+ rate_timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL
+);
+
+CREATE INDEX rate_timestamp_idx ON dw.ex_rate (rate_timestamp);
+
+CREATE INDEX source_currency_sc_destination_currency_sc_timestamp_idx ON dw.ex_rate (source_currency_symbolic_code,
+ destination_currency_symbolic_code,
+ rate_timestamp);
diff --git a/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java b/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java
index 21722a42..88f9bf01 100644
--- a/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java
+++ b/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java
@@ -28,7 +28,8 @@
"kafka.topics.source.enabled=true",
"kafka.topics.destination.enabled=true",
"kafka.topics.pm-events-payout.enabled=true",
- "kafka.topics.limit-config.enabled=true"},
+ "kafka.topics.limit-config.enabled=true",
+ "kafka.topics.exrate.enabled=true"},
topicsKeys = {
"kafka.topics.invoice.id",
"kafka.topics.recurrent-payment-tool.id",
@@ -42,7 +43,10 @@
"kafka.topics.source.id",
"kafka.topics.destination.id",
"kafka.topics.pm-events-payout.id",
- "kafka.topics.limit-config.id"})
+ "kafka.topics.limit-config.id",
+ "kafka.topics.limit-config.id",
+ "kafka.topics.exrate.id"}
+)
@DefaultSpringBootTest
@Import(KafkaProducer.class)
public @interface KafkaPostgresqlSpringBootITest {
diff --git a/src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java b/src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java
new file mode 100644
index 00000000..e1db09f9
--- /dev/null
+++ b/src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java
@@ -0,0 +1,89 @@
+package dev.vality.newway.kafka;
+
+import dev.vality.exrates.base.Rational;
+import dev.vality.exrates.events.Currency;
+import dev.vality.exrates.events.CurrencyEvent;
+import dev.vality.exrates.events.CurrencyEventPayload;
+import dev.vality.exrates.events.CurrencyExchangeRate;
+import dev.vality.geck.common.util.TypeUtil;
+import dev.vality.newway.config.KafkaPostgresqlSpringBootITest;
+import dev.vality.newway.dao.exrate.iface.ExchangeRateDao;
+import dev.vality.newway.domain.tables.pojos.ExRate;
+import dev.vality.newway.service.ExchangeRateService;
+import org.apache.thrift.TBase;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.mock.mockito.SpyBean;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+@KafkaPostgresqlSpringBootITest
+public class ExchangeRateKafkaListenerTest {
+
+ @Value("${kafka.topics.exrate.id}")
+ public String topic;
+
+ @Autowired
+ private dev.vality.testcontainers.annotations.kafka.config.KafkaProducer> testThriftKafkaProducer;
+
+ @SpyBean
+ private ExchangeRateService exchangeRateService;
+
+ @Autowired
+ private ExchangeRateDao exchangeRateDao;
+
+ @Test
+ public void listenExchangeRateEventTest() {
+ // Given
+ CurrencyEvent currencyEvent = buildCurrencyEvent();
+ CurrencyExchangeRate exchangeRate = currencyEvent.payload.getExchangeRate();
+
+ // When
+ testThriftKafkaProducer.send(topic, currencyEvent);
+ await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
+ return exchangeRateDao.findBySourceSymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode()) != null;
+ });
+ ExRate exrate = exchangeRateDao.findBySourceSymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode());
+
+ // Then
+ verify(exchangeRateService, timeout(TimeUnit.MINUTES.toMillis(1)).times(1)).handleEvents(anyList());
+ assertEquals(exchangeRate.getSourceCurrency().getSymbolicCode(), exrate.getSourceCurrencySymbolicCode());
+ assertEquals(exchangeRate.getSourceCurrency().getExponent(), exrate.getSourceCurrencyExponent());
+ assertEquals(exchangeRate.getDestinationCurrency().getSymbolicCode(), exrate.getDestinationCurrencySymbolicCode());
+ assertEquals(exchangeRate.getDestinationCurrency().getExponent(), exrate.getDestinationCurrencyExponent());
+ assertEquals(exchangeRate.getExchangeRate().p, exrate.getRationalP());
+ assertEquals(exchangeRate.getExchangeRate().q, exrate.getRationalQ());
+ assertEquals(TypeUtil.stringToLocalDateTime(exchangeRate.getTimestamp()).truncatedTo(ChronoUnit.SECONDS),
+ exrate.getRateTimestamp().truncatedTo(ChronoUnit.SECONDS));
+ }
+
+ private CurrencyEvent buildCurrencyEvent() {
+ CurrencyEvent currencyEvent = new CurrencyEvent();
+ currencyEvent.setEventId(UUID.randomUUID().toString());
+ currencyEvent.setEventCreatedAt(TypeUtil.temporalToString(LocalDateTime.now()));
+ currencyEvent.setPayload(
+ CurrencyEventPayload.exchange_rate(
+ new CurrencyExchangeRate(
+ new Currency("USD", (short) 2),
+ new Currency("RUB", (short) 2),
+ new Rational(60797502, 1000000),
+ TypeUtil.temporalToString(Instant.now())
+ )
+ )
+ );
+
+ return currencyEvent;
+ }
+
+}