From f6537ca2e822773da374ab3f04eb74f176071dc9 Mon Sep 17 00:00:00 2001 From: vitaxa Date: Tue, 18 Oct 2022 14:32:43 +0300 Subject: [PATCH 1/5] add exrate listener bump service parent pom --- .github/workflows/build.yml | 6 +- .github/workflows/deploy.yml | 10 +-- pom.xml | 13 ++- .../dev/vality/newway/config/KafkaConfig.java | 22 ++++- .../properties/KafkaConsumerProperties.java | 1 + .../dao/exrate/iface/ExchangeRateDao.java | 10 +++ .../dao/exrate/impl/ExchangeRateDaoImpl.java | 45 ++++++++++ .../exrate/CurrencyExchangeRateHandler.java | 51 +++++++++++ .../impl/exrate/ExchangeRateHandler.java | 10 +++ .../newway/listener/ExchangeRateListener.java | 36 ++++++++ ...CurrencyExchangeRateEventDeserializer.java | 11 +++ .../newway/service/ExchangeRateService.java | 27 ++++++ src/main/resources/application.yml | 5 ++ .../migration/V7__add_new_exchange_rates.sql | 19 ++++ .../KafkaPostgresqlSpringBootITest.java | 9 +- .../kafka/ExchangeRateKafkaListenerTest.java | 89 +++++++++++++++++++ 16 files changed, 348 insertions(+), 16 deletions(-) create mode 100644 src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java create mode 100644 src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java create mode 100644 src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java create mode 100644 src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java create mode 100644 src/main/java/dev/vality/newway/listener/ExchangeRateListener.java create mode 100644 src/main/java/dev/vality/newway/serde/CurrencyExchangeRateEventDeserializer.java create mode 100644 src/main/java/dev/vality/newway/service/ExchangeRateService.java create mode 100644 src/main/resources/db/migration/V7__add_new_exchange_rates.sql create mode 100644 src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3b310f08..d36f85c5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,10 +1,10 @@ -name: Maven Build Artifact +name: Build Maven Artifact on: pull_request: branches: - - '**' + - '*' jobs: build: - uses: valitydev/java-workflow/.github/workflows/maven-service-build.yml@v1 + uses: valitydev/base-workflow/.github/workflows/maven-service-build.yml@v2-beta diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 5a405cb6..d464b920 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -1,4 +1,4 @@ -name: Maven Deploy Artifact +name: Deploy Docker Image on: push: @@ -7,13 +7,9 @@ on: - 'main' - 'epic/**' -env: - REGISTRY: ghcr.io - IMAGE_NAME: ${{ github.repository }} - jobs: - deploy: - uses: valitydev/java-workflow/.github/workflows/maven-service-deploy.yml@v1 + build-and-deploy: + uses: valitydev/base-workflow/.github/workflows/maven-service-deploy.yml@v2-beta secrets: github-token: ${{ secrets.GITHUB_TOKEN }} mm-webhook-url: ${{ secrets.MATTERMOST_WEBHOOK_URL }} diff --git a/pom.xml b/pom.xml index 5ec32881..c4ab43a2 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ dev.vality service-parent-pom - 1.0.18 + 2.0.0-BETA-11 newway @@ -177,6 +177,11 @@ limiter-proto 1.32-6158184 + + dev.vality + exrates-proto + 1.3-875328b + dev.vality shared-resources @@ -202,6 +207,12 @@ 1.4.1 test + + org.awaitility + awaitility + 4.2.0 + test + diff --git a/src/main/java/dev/vality/newway/config/KafkaConfig.java b/src/main/java/dev/vality/newway/config/KafkaConfig.java index 9f602da0..df10d067 100644 --- a/src/main/java/dev/vality/newway/config/KafkaConfig.java +++ b/src/main/java/dev/vality/newway/config/KafkaConfig.java @@ -1,8 +1,10 @@ package dev.vality.newway.config; +import dev.vality.exrates.events.CurrencyEvent; import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory; import dev.vality.machinegun.eventsink.MachineEvent; import dev.vality.newway.config.properties.KafkaConsumerProperties; +import dev.vality.newway.serde.CurrencyExchangeRateEventDeserializer; import dev.vality.newway.serde.PayoutEventDeserializer; import dev.vality.newway.serde.SinkEventDeserializer; import dev.vality.payout.manager.Event; @@ -33,6 +35,9 @@ public class KafkaConfig { @Value("${kafka.topics.party-management.consumer.group-id}") private String partyConsumerGroup; + @Value("${kafka.topics.exrate.consumer.group-id}") + private String exrateConsumerGroup; + @Bean public Map consumerConfigs() { return createConsumerConfig(); @@ -136,9 +141,20 @@ public KafkaListenerContainerFactory> createConcurrentFactory( - ConsumerFactory consumerFactory, int threadsNumber) { - ConcurrentKafkaListenerContainerFactory factory = + @Bean + public KafkaListenerContainerFactory> exchangeRateContainerFactory() { + Map props = kafkaProperties.buildConsumerProperties(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CurrencyExchangeRateEventDeserializer.class); + props.put(ConsumerConfig.GROUP_ID_CONFIG, exrateConsumerGroup); + ConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(props); + + return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getExrateConcurrency()); + } + + private KafkaListenerContainerFactory> createConcurrentFactory( + ConsumerFactory consumerFactory, int threadsNumber) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); initFactory(consumerFactory, threadsNumber, factory); return factory; diff --git a/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java b/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java index a4dab743..b6536a6f 100644 --- a/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java +++ b/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java @@ -25,5 +25,6 @@ public class KafkaConsumerProperties { private int destinationConcurrency; private int withdrawalSessionConcurrency; private int limitConfigConcurrency; + private int exrateConcurrency; } diff --git a/src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java b/src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java new file mode 100644 index 00000000..8fbe0a0c --- /dev/null +++ b/src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java @@ -0,0 +1,10 @@ +package dev.vality.newway.dao.exrate.iface; + +import dev.vality.dao.DaoException; +import dev.vality.dao.GenericDao; +import dev.vality.newway.domain.tables.pojos.Exrate; + +public interface ExchangeRateDao extends GenericDao { + void save(Exrate exchangeRate) throws DaoException; + Exrate findBySourceSymbolicCode(String symbolicCode); +} diff --git a/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java b/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java new file mode 100644 index 00000000..f4c2029b --- /dev/null +++ b/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java @@ -0,0 +1,45 @@ +package dev.vality.newway.dao.exrate.impl; + +import dev.vality.dao.DaoException; +import dev.vality.dao.impl.AbstractGenericDao; +import dev.vality.mapper.RecordRowMapper; +import dev.vality.newway.dao.exrate.iface.ExchangeRateDao; +import dev.vality.newway.domain.tables.pojos.Exrate; +import dev.vality.newway.domain.tables.records.ExrateRecord; +import org.jooq.Query; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Component; + +import javax.sql.DataSource; + +import static dev.vality.newway.domain.tables.Exrate.EXRATE; + +@Component +public class ExchangeRateDaoImpl extends AbstractGenericDao implements ExchangeRateDao { + + private final RowMapper rowMapper; + + @Autowired + public ExchangeRateDaoImpl(@Qualifier("dataSource") DataSource dataSource) { + super(dataSource); + this.rowMapper = new RecordRowMapper<>(EXRATE, Exrate.class); + } + + @Override + public void save(Exrate exchangeRate) throws DaoException { + ExrateRecord record = getDslContext().newRecord(EXRATE, exchangeRate); + Query query = getDslContext().insertInto(EXRATE).set(record) + .onConflict(EXRATE.EVENT_ID) + .doNothing(); + execute(query); + } + + @Override + public Exrate findBySourceSymbolicCode(String symbolicCode) { + Query query = getDslContext().selectFrom(EXRATE) + .where(EXRATE.SOURCE_CURRENCY_SYMBOLIC_CODE.eq(symbolicCode)); + return fetchOne(query, rowMapper); + } +} diff --git a/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java new file mode 100644 index 00000000..688b3a9c --- /dev/null +++ b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java @@ -0,0 +1,51 @@ +package dev.vality.newway.handler.event.stock.impl.exrate; + +import dev.vality.exrates.events.CurrencyEvent; +import dev.vality.exrates.events.CurrencyExchangeRate; +import dev.vality.geck.common.util.TypeUtil; +import dev.vality.newway.dao.exrate.iface.ExchangeRateDao; +import dev.vality.newway.domain.tables.pojos.Exrate; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +@Slf4j +@Component +@RequiredArgsConstructor +public class CurrencyExchangeRateHandler implements ExchangeRateHandler { + + private final ExchangeRateDao exchangeRateDao; + + @Override + public void handle(CurrencyEvent event) { + log.info("Handle currency event. eventId={};sourceCurrency={};destinationCurrency={}", + event.getEventId(), + event.payload.getExchangeRate().getSourceCurrency(), + event.payload.getExchangeRate().getDestinationCurrency()); + CurrencyExchangeRate exchangeRate = event.getPayload().getExchangeRate(); + Exrate exrate = new Exrate(); + exrate.setEventId(UUID.fromString(event.getEventId())); + exrate.setEventCreatedAt(TypeUtil.stringToLocalDateTime(event.getEventCreatedAt())); + exrate.setSourceCurrencySymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode()); + exrate.setSourceCurrencyExponent(exchangeRate.getSourceCurrency().getExponent()); + exrate.setDestinationCurrencySymbolicCode(exchangeRate.getDestinationCurrency().getSymbolicCode()); + exrate.setDestinationCurrencyExponent(exchangeRate.getDestinationCurrency().getExponent()); + exrate.setRationalP(exchangeRate.getExchangeRate().getP()); + exrate.setRationalQ(exchangeRate.getExchangeRate().getQ()); + exrate.setRateTimestamp(TypeUtil.stringToLocalDateTime(exchangeRate.timestamp)); + + exchangeRateDao.save(exrate); + + log.info("The exchange rate was successfully saved. eventId={};sourceCurrency={};destinationCurrency={}", + event.getEventId(), + event.payload.getExchangeRate().getSourceCurrency(), + event.payload.getExchangeRate().getDestinationCurrency()); + } + + @Override + public boolean isHandle(CurrencyEvent currencyEvent) { + return currencyEvent.payload.isSetExchangeRate(); + } +} diff --git a/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java new file mode 100644 index 00000000..0d672913 --- /dev/null +++ b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java @@ -0,0 +1,10 @@ +package dev.vality.newway.handler.event.stock.impl.exrate; + +import dev.vality.exrates.events.CurrencyEvent; + +public interface ExchangeRateHandler { + + void handle(CurrencyEvent currencyEvent); + + boolean isHandle(CurrencyEvent currencyEvent); +} diff --git a/src/main/java/dev/vality/newway/listener/ExchangeRateListener.java b/src/main/java/dev/vality/newway/listener/ExchangeRateListener.java new file mode 100644 index 00000000..7fb72020 --- /dev/null +++ b/src/main/java/dev/vality/newway/listener/ExchangeRateListener.java @@ -0,0 +1,36 @@ +package dev.vality.newway.listener; + +import dev.vality.exrates.events.CurrencyEvent; +import dev.vality.newway.service.ExchangeRateService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@RequiredArgsConstructor +@Service +public class ExchangeRateListener { + + private final ExchangeRateService exchangeRateService; + + @KafkaListener( + autoStartup = "${kafka.topics.exrate.enabled}", + topics = "${kafka.topics.exrate.id}", + containerFactory = "exchangeRateContainerFactory") + public void handle(List> messages, Acknowledgment ack) { + log.info("Got ExchangeRate messages batch with size: {}", messages.size()); + exchangeRateService.handleEvents( + messages.stream() + .map(ConsumerRecord::value) + .collect(Collectors.toList()) + ); + ack.acknowledge(); + log.info("Batch ExchangeRate has been committed, size={}", messages.size()); + } +} diff --git a/src/main/java/dev/vality/newway/serde/CurrencyExchangeRateEventDeserializer.java b/src/main/java/dev/vality/newway/serde/CurrencyExchangeRateEventDeserializer.java new file mode 100644 index 00000000..53096d9b --- /dev/null +++ b/src/main/java/dev/vality/newway/serde/CurrencyExchangeRateEventDeserializer.java @@ -0,0 +1,11 @@ +package dev.vality.newway.serde; + +import dev.vality.exrates.events.CurrencyEvent; +import dev.vality.kafka.common.serialization.AbstractThriftDeserializer; + +public class CurrencyExchangeRateEventDeserializer extends AbstractThriftDeserializer { + @Override + public CurrencyEvent deserialize(String topic, byte[] data) { + return deserialize(data, new CurrencyEvent()); + } +} diff --git a/src/main/java/dev/vality/newway/service/ExchangeRateService.java b/src/main/java/dev/vality/newway/service/ExchangeRateService.java new file mode 100644 index 00000000..d95cc5c3 --- /dev/null +++ b/src/main/java/dev/vality/newway/service/ExchangeRateService.java @@ -0,0 +1,27 @@ +package dev.vality.newway.service; + +import dev.vality.exrates.events.CurrencyEvent; +import dev.vality.newway.handler.event.stock.impl.exrate.ExchangeRateHandler; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +@Service +@RequiredArgsConstructor +public class ExchangeRateService { + + private final List exchangeRateHandlers; + + @Transactional(propagation = Propagation.REQUIRED) + public void handleEvents(List events) { + for (CurrencyEvent event : events) { + exchangeRateHandlers.stream() + .filter(exchangeRateHandler -> exchangeRateHandler.isHandle(event)) + .forEach(exchangeRateHandler -> exchangeRateHandler.handle(event)); + } + } + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 1ac43272..be610498 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -67,6 +67,7 @@ kafka: destination-concurrency: 7 withdrawal-session-concurrency: 7 limit-config-concurrency: 7 + exrate-concurrency: 7 topics: invoice: id: mg-invoice-100-2 @@ -108,6 +109,10 @@ kafka: limit-config: id: mg-events-lim-config enabled: false + exrate: + id: etl-exchange-rate + enabled: false + consumer.group-id: "newway-exrate" dmt: url: http://dominant:8022/v1/domain/repository diff --git a/src/main/resources/db/migration/V7__add_new_exchange_rates.sql b/src/main/resources/db/migration/V7__add_new_exchange_rates.sql new file mode 100644 index 00000000..41c48a48 --- /dev/null +++ b/src/main/resources/db/migration/V7__add_new_exchange_rates.sql @@ -0,0 +1,19 @@ +CREATE TABLE dw.exrate +( + id BIGSERIAL NOT NULL, + event_id uuid UNIQUE NOT NULL, + event_created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, + source_currency_symbolic_code CHARACTER VARYING NOT NULL, + source_currency_exponent SMALLINT NOT NULL, + destination_currency_symbolic_code CHARACTER VARYING NOT NULL, + destination_currency_exponent SMALLINT NOT NULL, + rational_p BIGINT NOT NULL, + rational_q BIGINT NOT NULL, + rate_timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL +); + +CREATE INDEX rate_timestamp_idx ON dw.exrate (rate_timestamp); + +CREATE INDEX source_currency_sc_destination_currency_sc_timestamp_idx ON dw.exrate (source_currency_symbolic_code, + destination_currency_symbolic_code, + rate_timestamp); diff --git a/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java b/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java index 21722a42..7cf40230 100644 --- a/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java +++ b/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java @@ -28,7 +28,9 @@ "kafka.topics.source.enabled=true", "kafka.topics.destination.enabled=true", "kafka.topics.pm-events-payout.enabled=true", - "kafka.topics.limit-config.enabled=true"}, + "kafka.topics.limit-config.enabled=true", + "kafka.topics.limit-config.enabled=true", + "kafka.topics.exrate.enabled=true"}, topicsKeys = { "kafka.topics.invoice.id", "kafka.topics.recurrent-payment-tool.id", @@ -42,7 +44,10 @@ "kafka.topics.source.id", "kafka.topics.destination.id", "kafka.topics.pm-events-payout.id", - "kafka.topics.limit-config.id"}) + "kafka.topics.limit-config.id", + "kafka.topics.limit-config.id", + "kafka.topics.exrate.id"} +) @DefaultSpringBootTest @Import(KafkaProducer.class) public @interface KafkaPostgresqlSpringBootITest { diff --git a/src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java b/src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java new file mode 100644 index 00000000..3e5d37d4 --- /dev/null +++ b/src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java @@ -0,0 +1,89 @@ +package dev.vality.newway.kafka; + +import dev.vality.exrates.base.Rational; +import dev.vality.exrates.events.Currency; +import dev.vality.exrates.events.CurrencyEvent; +import dev.vality.exrates.events.CurrencyEventPayload; +import dev.vality.exrates.events.CurrencyExchangeRate; +import dev.vality.geck.common.util.TypeUtil; +import dev.vality.newway.config.KafkaPostgresqlSpringBootITest; +import dev.vality.newway.dao.exrate.iface.ExchangeRateDao; +import dev.vality.newway.domain.tables.pojos.Exrate; +import dev.vality.newway.service.ExchangeRateService; +import org.apache.thrift.TBase; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.mock.mockito.SpyBean; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +@KafkaPostgresqlSpringBootITest +public class ExchangeRateKafkaListenerTest { + + @Value("${kafka.topics.exrate.id}") + public String topic; + + @Autowired + private dev.vality.testcontainers.annotations.kafka.config.KafkaProducer> testThriftKafkaProducer; + + @SpyBean + private ExchangeRateService exchangeRateService; + + @Autowired + private ExchangeRateDao exchangeRateDao; + + @Test + public void listenExchangeRateEventTest() { + // Given + CurrencyEvent currencyEvent = buildCurrencyEvent(); + CurrencyExchangeRate exchangeRate = currencyEvent.payload.getExchangeRate(); + + // When + testThriftKafkaProducer.send(topic, currencyEvent); + await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> { + return exchangeRateDao.findBySourceSymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode()) != null; + }); + Exrate exrate = exchangeRateDao.findBySourceSymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode()); + + // Then + verify(exchangeRateService, timeout(TimeUnit.MINUTES.toMillis(1)).times(1)).handleEvents(anyList()); + assertEquals(exchangeRate.getSourceCurrency().getSymbolicCode(), exrate.getSourceCurrencySymbolicCode()); + assertEquals(exchangeRate.getSourceCurrency().getExponent(), exrate.getSourceCurrencyExponent()); + assertEquals(exchangeRate.getDestinationCurrency().getSymbolicCode(), exrate.getDestinationCurrencySymbolicCode()); + assertEquals(exchangeRate.getDestinationCurrency().getExponent(), exrate.getDestinationCurrencyExponent()); + assertEquals(exchangeRate.getExchangeRate().p, exrate.getRationalP()); + assertEquals(exchangeRate.getExchangeRate().q, exrate.getRationalQ()); + assertEquals(TypeUtil.stringToLocalDateTime(exchangeRate.getTimestamp()).truncatedTo(ChronoUnit.SECONDS), + exrate.getRateTimestamp().truncatedTo(ChronoUnit.SECONDS)); + } + + private CurrencyEvent buildCurrencyEvent() { + CurrencyEvent currencyEvent = new CurrencyEvent(); + currencyEvent.setEventId(UUID.randomUUID().toString()); + currencyEvent.setEventCreatedAt(TypeUtil.temporalToString(LocalDateTime.now())); + currencyEvent.setPayload( + CurrencyEventPayload.exchange_rate( + new CurrencyExchangeRate( + new Currency("USD", (short) 2), + new Currency("RUB", (short) 2), + new Rational(60797502, 1000000), + TypeUtil.temporalToString(Instant.now()) + ) + ) + ); + + return currencyEvent; + } + +} From c75f7c0e2484346f7be403c5231567e5a324d659 Mon Sep 17 00:00:00 2001 From: vitaxa Date: Tue, 18 Oct 2022 14:41:10 +0300 Subject: [PATCH 2/5] fix github action --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d36f85c5..38080b60 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -3,7 +3,7 @@ name: Build Maven Artifact on: pull_request: branches: - - '*' + - '**' jobs: build: From 2f306023a9702b27465a6353a4269381501c6fac Mon Sep 17 00:00:00 2001 From: vitaxa Date: Tue, 18 Oct 2022 15:07:46 +0300 Subject: [PATCH 3/5] review fix --- src/main/resources/application.yml | 2 +- .../vality/newway/config/KafkaPostgresqlSpringBootITest.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index be610498..545201fc 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -112,7 +112,7 @@ kafka: exrate: id: etl-exchange-rate enabled: false - consumer.group-id: "newway-exrate" + consumer.group-id: "daway-exrate" dmt: url: http://dominant:8022/v1/domain/repository diff --git a/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java b/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java index 7cf40230..88f9bf01 100644 --- a/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java +++ b/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java @@ -29,7 +29,6 @@ "kafka.topics.destination.enabled=true", "kafka.topics.pm-events-payout.enabled=true", "kafka.topics.limit-config.enabled=true", - "kafka.topics.limit-config.enabled=true", "kafka.topics.exrate.enabled=true"}, topicsKeys = { "kafka.topics.invoice.id", From f6b71f3e3d496e7e95e4cb6b9f983f6d17608fde Mon Sep 17 00:00:00 2001 From: vitaxa Date: Tue, 18 Oct 2022 16:52:07 +0300 Subject: [PATCH 4/5] review fix [2] --- .../dao/exrate/iface/ExchangeRateDao.java | 8 ++-- .../dao/exrate/impl/ExchangeRateDaoImpl.java | 30 +++++++------ .../exrate/CurrencyExchangeRateHandler.java | 44 +++++++++---------- .../impl/exrate/ExchangeRateHandler.java | 4 +- .../newway/service/ExchangeRateService.java | 12 ++--- .../migration/V7__add_new_exchange_rates.sql | 6 +-- .../kafka/ExchangeRateKafkaListenerTest.java | 4 +- 7 files changed, 56 insertions(+), 52 deletions(-) diff --git a/src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java b/src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java index 8fbe0a0c..19bda164 100644 --- a/src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java +++ b/src/main/java/dev/vality/newway/dao/exrate/iface/ExchangeRateDao.java @@ -2,9 +2,11 @@ import dev.vality.dao.DaoException; import dev.vality.dao.GenericDao; -import dev.vality.newway.domain.tables.pojos.Exrate; +import dev.vality.newway.domain.tables.pojos.ExRate; + +import java.util.List; public interface ExchangeRateDao extends GenericDao { - void save(Exrate exchangeRate) throws DaoException; - Exrate findBySourceSymbolicCode(String symbolicCode); + void saveBatch(List exchangeRates) throws DaoException; + ExRate findBySourceSymbolicCode(String symbolicCode); } diff --git a/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java b/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java index f4c2029b..4aeacb5d 100644 --- a/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java +++ b/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java @@ -4,8 +4,7 @@ import dev.vality.dao.impl.AbstractGenericDao; import dev.vality.mapper.RecordRowMapper; import dev.vality.newway.dao.exrate.iface.ExchangeRateDao; -import dev.vality.newway.domain.tables.pojos.Exrate; -import dev.vality.newway.domain.tables.records.ExrateRecord; +import dev.vality.newway.domain.tables.pojos.ExRate; import org.jooq.Query; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -13,33 +12,36 @@ import org.springframework.stereotype.Component; import javax.sql.DataSource; +import java.util.List; +import java.util.stream.Collectors; -import static dev.vality.newway.domain.tables.Exrate.EXRATE; +import static dev.vality.newway.domain.tables.ExRate.EX_RATE; @Component public class ExchangeRateDaoImpl extends AbstractGenericDao implements ExchangeRateDao { - private final RowMapper rowMapper; + private final RowMapper rowMapper; @Autowired public ExchangeRateDaoImpl(@Qualifier("dataSource") DataSource dataSource) { super(dataSource); - this.rowMapper = new RecordRowMapper<>(EXRATE, Exrate.class); + this.rowMapper = new RecordRowMapper<>(EX_RATE, ExRate.class); } @Override - public void save(Exrate exchangeRate) throws DaoException { - ExrateRecord record = getDslContext().newRecord(EXRATE, exchangeRate); - Query query = getDslContext().insertInto(EXRATE).set(record) - .onConflict(EXRATE.EVENT_ID) - .doNothing(); - execute(query); + public void saveBatch(List exchangeRates) throws DaoException { + List queryList = exchangeRates.stream() + .map(exrate -> getDslContext().newRecord(EX_RATE, exrate)) + .map(record -> (Query) getDslContext().insertInto(EX_RATE).set(record) + .onConflict(EX_RATE.EVENT_ID) + .doNothing()).collect(Collectors.toList()); + batchExecute(queryList); } @Override - public Exrate findBySourceSymbolicCode(String symbolicCode) { - Query query = getDslContext().selectFrom(EXRATE) - .where(EXRATE.SOURCE_CURRENCY_SYMBOLIC_CODE.eq(symbolicCode)); + public ExRate findBySourceSymbolicCode(String symbolicCode) { + Query query = getDslContext().selectFrom(EX_RATE) + .where(EX_RATE.SOURCE_CURRENCY_SYMBOLIC_CODE.eq(symbolicCode)); return fetchOne(query, rowMapper); } } diff --git a/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java index 688b3a9c..3adec16d 100644 --- a/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java +++ b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/CurrencyExchangeRateHandler.java @@ -4,12 +4,14 @@ import dev.vality.exrates.events.CurrencyExchangeRate; import dev.vality.geck.common.util.TypeUtil; import dev.vality.newway.dao.exrate.iface.ExchangeRateDao; -import dev.vality.newway.domain.tables.pojos.Exrate; +import dev.vality.newway.domain.tables.pojos.ExRate; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; @Slf4j @Component @@ -19,29 +21,23 @@ public class CurrencyExchangeRateHandler implements ExchangeRateHandler { private final ExchangeRateDao exchangeRateDao; @Override - public void handle(CurrencyEvent event) { - log.info("Handle currency event. eventId={};sourceCurrency={};destinationCurrency={}", - event.getEventId(), - event.payload.getExchangeRate().getSourceCurrency(), - event.payload.getExchangeRate().getDestinationCurrency()); - CurrencyExchangeRate exchangeRate = event.getPayload().getExchangeRate(); - Exrate exrate = new Exrate(); - exrate.setEventId(UUID.fromString(event.getEventId())); - exrate.setEventCreatedAt(TypeUtil.stringToLocalDateTime(event.getEventCreatedAt())); - exrate.setSourceCurrencySymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode()); - exrate.setSourceCurrencyExponent(exchangeRate.getSourceCurrency().getExponent()); - exrate.setDestinationCurrencySymbolicCode(exchangeRate.getDestinationCurrency().getSymbolicCode()); - exrate.setDestinationCurrencyExponent(exchangeRate.getDestinationCurrency().getExponent()); - exrate.setRationalP(exchangeRate.getExchangeRate().getP()); - exrate.setRationalQ(exchangeRate.getExchangeRate().getQ()); - exrate.setRateTimestamp(TypeUtil.stringToLocalDateTime(exchangeRate.timestamp)); - - exchangeRateDao.save(exrate); - - log.info("The exchange rate was successfully saved. eventId={};sourceCurrency={};destinationCurrency={}", - event.getEventId(), - event.payload.getExchangeRate().getSourceCurrency(), - event.payload.getExchangeRate().getDestinationCurrency()); + public void handle(List events) { + List exrates = events.stream().map(currencyEvent -> { + CurrencyExchangeRate exchangeRate = currencyEvent.getPayload().getExchangeRate(); + ExRate exrate = new ExRate(); + exrate.setEventId(UUID.fromString(currencyEvent.getEventId())); + exrate.setEventCreatedAt(TypeUtil.stringToLocalDateTime(currencyEvent.getEventCreatedAt())); + exrate.setSourceCurrencySymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode()); + exrate.setSourceCurrencyExponent(exchangeRate.getSourceCurrency().getExponent()); + exrate.setDestinationCurrencySymbolicCode(exchangeRate.getDestinationCurrency().getSymbolicCode()); + exrate.setDestinationCurrencyExponent(exchangeRate.getDestinationCurrency().getExponent()); + exrate.setRationalP(exchangeRate.getExchangeRate().getP()); + exrate.setRationalQ(exchangeRate.getExchangeRate().getQ()); + exrate.setRateTimestamp(TypeUtil.stringToLocalDateTime(exchangeRate.timestamp)); + return exrate; + }).collect(Collectors.toList()); + + exchangeRateDao.saveBatch(exrates); } @Override diff --git a/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java index 0d672913..006df3ae 100644 --- a/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java +++ b/src/main/java/dev/vality/newway/handler/event/stock/impl/exrate/ExchangeRateHandler.java @@ -2,9 +2,11 @@ import dev.vality.exrates.events.CurrencyEvent; +import java.util.List; + public interface ExchangeRateHandler { - void handle(CurrencyEvent currencyEvent); + void handle(List currencyEvents); boolean isHandle(CurrencyEvent currencyEvent); } diff --git a/src/main/java/dev/vality/newway/service/ExchangeRateService.java b/src/main/java/dev/vality/newway/service/ExchangeRateService.java index d95cc5c3..1f9153ab 100644 --- a/src/main/java/dev/vality/newway/service/ExchangeRateService.java +++ b/src/main/java/dev/vality/newway/service/ExchangeRateService.java @@ -8,6 +8,7 @@ import org.springframework.transaction.annotation.Transactional; import java.util.List; +import java.util.stream.Collectors; @Service @RequiredArgsConstructor @@ -17,11 +18,12 @@ public class ExchangeRateService { @Transactional(propagation = Propagation.REQUIRED) public void handleEvents(List events) { - for (CurrencyEvent event : events) { - exchangeRateHandlers.stream() - .filter(exchangeRateHandler -> exchangeRateHandler.isHandle(event)) - .forEach(exchangeRateHandler -> exchangeRateHandler.handle(event)); - } + events.stream() + .collect(Collectors.groupingBy( + currencyEvent -> exchangeRateHandlers.stream() + .filter(exchangeRateHandler -> exchangeRateHandler.isHandle(currencyEvent)) + .findAny().orElseThrow()) + ).forEach(ExchangeRateHandler::handle); } } diff --git a/src/main/resources/db/migration/V7__add_new_exchange_rates.sql b/src/main/resources/db/migration/V7__add_new_exchange_rates.sql index 41c48a48..4a06012c 100644 --- a/src/main/resources/db/migration/V7__add_new_exchange_rates.sql +++ b/src/main/resources/db/migration/V7__add_new_exchange_rates.sql @@ -1,4 +1,4 @@ -CREATE TABLE dw.exrate +CREATE TABLE dw.ex_rate ( id BIGSERIAL NOT NULL, event_id uuid UNIQUE NOT NULL, @@ -12,8 +12,8 @@ CREATE TABLE dw.exrate rate_timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL ); -CREATE INDEX rate_timestamp_idx ON dw.exrate (rate_timestamp); +CREATE INDEX rate_timestamp_idx ON dw.ex_rate (rate_timestamp); -CREATE INDEX source_currency_sc_destination_currency_sc_timestamp_idx ON dw.exrate (source_currency_symbolic_code, +CREATE INDEX source_currency_sc_destination_currency_sc_timestamp_idx ON dw.ex_rate (source_currency_symbolic_code, destination_currency_symbolic_code, rate_timestamp); diff --git a/src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java b/src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java index 3e5d37d4..e1db09f9 100644 --- a/src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java +++ b/src/test/java/dev/vality/newway/kafka/ExchangeRateKafkaListenerTest.java @@ -8,7 +8,7 @@ import dev.vality.geck.common.util.TypeUtil; import dev.vality.newway.config.KafkaPostgresqlSpringBootITest; import dev.vality.newway.dao.exrate.iface.ExchangeRateDao; -import dev.vality.newway.domain.tables.pojos.Exrate; +import dev.vality.newway.domain.tables.pojos.ExRate; import dev.vality.newway.service.ExchangeRateService; import org.apache.thrift.TBase; import org.junit.jupiter.api.Test; @@ -54,7 +54,7 @@ public void listenExchangeRateEventTest() { await().atMost(30, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> { return exchangeRateDao.findBySourceSymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode()) != null; }); - Exrate exrate = exchangeRateDao.findBySourceSymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode()); + ExRate exrate = exchangeRateDao.findBySourceSymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode()); // Then verify(exchangeRateService, timeout(TimeUnit.MINUTES.toMillis(1)).times(1)).handleEvents(anyList()); From 929e839ce845f0eb939944e799ea7a8d19889149 Mon Sep 17 00:00:00 2001 From: vitaxa Date: Tue, 18 Oct 2022 20:05:21 +0300 Subject: [PATCH 5/5] review fix [3] --- .../dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java b/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java index 4aeacb5d..22474563 100644 --- a/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java +++ b/src/main/java/dev/vality/newway/dao/exrate/impl/ExchangeRateDaoImpl.java @@ -34,7 +34,8 @@ public void saveBatch(List exchangeRates) throws DaoException { .map(exrate -> getDslContext().newRecord(EX_RATE, exrate)) .map(record -> (Query) getDslContext().insertInto(EX_RATE).set(record) .onConflict(EX_RATE.EVENT_ID) - .doNothing()).collect(Collectors.toList()); + .doNothing()) + .collect(Collectors.toList()); batchExecute(queryList); }