Skip to content
This repository was archived by the owner on Oct 10, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Maven Build Artifact
name: Build Maven Artifact

on:
pull_request:
Expand All @@ -7,4 +7,4 @@ on:

jobs:
build:
uses: valitydev/java-workflow/.github/workflows/maven-service-build.yml@v1
uses: valitydev/base-workflow/.github/workflows/maven-service-build.yml@v2-beta
10 changes: 3 additions & 7 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Maven Deploy Artifact
name: Deploy Docker Image

on:
push:
Expand All @@ -7,13 +7,9 @@ on:
- 'main'
- 'epic/**'

env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}

jobs:
deploy:
uses: valitydev/java-workflow/.github/workflows/maven-service-deploy.yml@v1
build-and-deploy:
uses: valitydev/base-workflow/.github/workflows/maven-service-deploy.yml@v2-beta
secrets:
github-token: ${{ secrets.GITHUB_TOKEN }}
mm-webhook-url: ${{ secrets.MATTERMOST_WEBHOOK_URL }}
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId>
<version>1.0.18</version>
<version>2.0.0-BETA-11</version>
</parent>

<artifactId>newway</artifactId>
Expand Down Expand Up @@ -177,6 +177,11 @@
<artifactId>limiter-proto</artifactId>
<version>1.32-6158184</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>exrates-proto</artifactId>
<version>1.3-875328b</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>shared-resources</artifactId>
Expand All @@ -202,6 +207,12 @@
<version>1.4.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
22 changes: 19 additions & 3 deletions src/main/java/dev/vality/newway/config/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package dev.vality.newway.config;

import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.kafka.common.util.ExponentialBackOffDefaultErrorHandlerFactory;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.newway.config.properties.KafkaConsumerProperties;
import dev.vality.newway.serde.CurrencyExchangeRateEventDeserializer;
import dev.vality.newway.serde.PayoutEventDeserializer;
import dev.vality.newway.serde.SinkEventDeserializer;
import dev.vality.payout.manager.Event;
Expand Down Expand Up @@ -33,6 +35,9 @@ public class KafkaConfig {
@Value("${kafka.topics.party-management.consumer.group-id}")
private String partyConsumerGroup;

@Value("${kafka.topics.exrate.consumer.group-id}")
private String exrateConsumerGroup;

@Bean
public Map<String, Object> consumerConfigs() {
return createConsumerConfig();
Expand Down Expand Up @@ -136,9 +141,20 @@ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getLimitConfigConcurrency());
}

private KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> createConcurrentFactory(
ConsumerFactory<String, MachineEvent> consumerFactory, int threadsNumber) {
ConcurrentKafkaListenerContainerFactory<String, MachineEvent> factory =
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, CurrencyEvent>> exchangeRateContainerFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CurrencyExchangeRateEventDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, exrateConsumerGroup);
ConsumerFactory<String, CurrencyEvent> consumerFactory = new DefaultKafkaConsumerFactory<>(props);

return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getExrateConcurrency());
}

private <T> KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, T>> createConcurrentFactory(
ConsumerFactory<String, T> consumerFactory, int threadsNumber) {
ConcurrentKafkaListenerContainerFactory<String, T> factory =
new ConcurrentKafkaListenerContainerFactory<>();
initFactory(consumerFactory, threadsNumber, factory);
return factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ public class KafkaConsumerProperties {
private int destinationConcurrency;
private int withdrawalSessionConcurrency;
private int limitConfigConcurrency;
private int exrateConcurrency;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package dev.vality.newway.dao.exrate.iface;

import dev.vality.dao.DaoException;
import dev.vality.dao.GenericDao;
import dev.vality.newway.domain.tables.pojos.ExRate;

import java.util.List;

public interface ExchangeRateDao extends GenericDao {
void saveBatch(List<ExRate> exchangeRates) throws DaoException;
ExRate findBySourceSymbolicCode(String symbolicCode);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package dev.vality.newway.dao.exrate.impl;

import dev.vality.dao.DaoException;
import dev.vality.dao.impl.AbstractGenericDao;
import dev.vality.mapper.RecordRowMapper;
import dev.vality.newway.dao.exrate.iface.ExchangeRateDao;
import dev.vality.newway.domain.tables.pojos.ExRate;
import org.jooq.Query;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.util.List;
import java.util.stream.Collectors;

import static dev.vality.newway.domain.tables.ExRate.EX_RATE;

@Component
public class ExchangeRateDaoImpl extends AbstractGenericDao implements ExchangeRateDao {

private final RowMapper<ExRate> rowMapper;

@Autowired
public ExchangeRateDaoImpl(@Qualifier("dataSource") DataSource dataSource) {
super(dataSource);
this.rowMapper = new RecordRowMapper<>(EX_RATE, ExRate.class);
}

@Override
public void saveBatch(List<ExRate> exchangeRates) throws DaoException {
List<Query> queryList = exchangeRates.stream()
.map(exrate -> getDslContext().newRecord(EX_RATE, exrate))
.map(record -> (Query) getDslContext().insertInto(EX_RATE).set(record)
.onConflict(EX_RATE.EVENT_ID)
.doNothing())
.collect(Collectors.toList());
batchExecute(queryList);
}

@Override
public ExRate findBySourceSymbolicCode(String symbolicCode) {
Query query = getDslContext().selectFrom(EX_RATE)
.where(EX_RATE.SOURCE_CURRENCY_SYMBOLIC_CODE.eq(symbolicCode));
return fetchOne(query, rowMapper);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package dev.vality.newway.handler.event.stock.impl.exrate;

import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.exrates.events.CurrencyExchangeRate;
import dev.vality.geck.common.util.TypeUtil;
import dev.vality.newway.dao.exrate.iface.ExchangeRateDao;
import dev.vality.newway.domain.tables.pojos.ExRate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

@Slf4j
@Component
@RequiredArgsConstructor
public class CurrencyExchangeRateHandler implements ExchangeRateHandler {

private final ExchangeRateDao exchangeRateDao;

@Override
public void handle(List<CurrencyEvent> events) {
List<ExRate> exrates = events.stream().map(currencyEvent -> {
CurrencyExchangeRate exchangeRate = currencyEvent.getPayload().getExchangeRate();
ExRate exrate = new ExRate();
exrate.setEventId(UUID.fromString(currencyEvent.getEventId()));
exrate.setEventCreatedAt(TypeUtil.stringToLocalDateTime(currencyEvent.getEventCreatedAt()));
exrate.setSourceCurrencySymbolicCode(exchangeRate.getSourceCurrency().getSymbolicCode());
exrate.setSourceCurrencyExponent(exchangeRate.getSourceCurrency().getExponent());
exrate.setDestinationCurrencySymbolicCode(exchangeRate.getDestinationCurrency().getSymbolicCode());
exrate.setDestinationCurrencyExponent(exchangeRate.getDestinationCurrency().getExponent());
exrate.setRationalP(exchangeRate.getExchangeRate().getP());
exrate.setRationalQ(exchangeRate.getExchangeRate().getQ());
exrate.setRateTimestamp(TypeUtil.stringToLocalDateTime(exchangeRate.timestamp));
return exrate;
Comment on lines +26 to +37

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Опционально, не хочешь вынести в конвертер или отдельный метод внутри handler'а?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nononononono

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

По-хорошему, тут везде надо рефакторить и заводить конвертеры. Ну короч хз, читабельнее щас от этого не станет. Если очень хочешь, тогда сделаю конечно.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не, у меня были сомнения и я решил тебе о них сказать :)
вдруг бы тоже сомнения породил xD

}).collect(Collectors.toList());

exchangeRateDao.saveBatch(exrates);
}

@Override
public boolean isHandle(CurrencyEvent currencyEvent) {
return currencyEvent.payload.isSetExchangeRate();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package dev.vality.newway.handler.event.stock.impl.exrate;

import dev.vality.exrates.events.CurrencyEvent;

import java.util.List;

public interface ExchangeRateHandler {

void handle(List<CurrencyEvent> currencyEvents);

boolean isHandle(CurrencyEvent currencyEvent);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package dev.vality.newway.listener;

import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.newway.service.ExchangeRateService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@RequiredArgsConstructor
@Service
public class ExchangeRateListener {

private final ExchangeRateService exchangeRateService;

@KafkaListener(
autoStartup = "${kafka.topics.exrate.enabled}",
topics = "${kafka.topics.exrate.id}",
containerFactory = "exchangeRateContainerFactory")
public void handle(List<ConsumerRecord<String, CurrencyEvent>> messages, Acknowledgment ack) {
log.info("Got ExchangeRate messages batch with size: {}", messages.size());
exchangeRateService.handleEvents(
messages.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList())
);
ack.acknowledge();
log.info("Batch ExchangeRate has been committed, size={}", messages.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package dev.vality.newway.serde;

import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.kafka.common.serialization.AbstractThriftDeserializer;

public class CurrencyExchangeRateEventDeserializer extends AbstractThriftDeserializer<CurrencyEvent> {
@Override
public CurrencyEvent deserialize(String topic, byte[] data) {
return deserialize(data, new CurrencyEvent());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package dev.vality.newway.service;

import dev.vality.exrates.events.CurrencyEvent;
import dev.vality.newway.handler.event.stock.impl.exrate.ExchangeRateHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.stream.Collectors;

@Service
@RequiredArgsConstructor
public class ExchangeRateService {

private final List<ExchangeRateHandler> exchangeRateHandlers;

@Transactional(propagation = Propagation.REQUIRED)
public void handleEvents(List<CurrencyEvent> events) {
events.stream()
.collect(Collectors.groupingBy(
currencyEvent -> exchangeRateHandlers.stream()
.filter(exchangeRateHandler -> exchangeRateHandler.isHandle(currencyEvent))
.findAny().orElseThrow())
).forEach(ExchangeRateHandler::handle);
}

}
5 changes: 5 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ kafka:
destination-concurrency: 7
withdrawal-session-concurrency: 7
limit-config-concurrency: 7
exrate-concurrency: 7
topics:
invoice:
id: mg-invoice-100-2
Expand Down Expand Up @@ -108,6 +109,10 @@ kafka:
limit-config:
id: mg-events-lim-config
enabled: false
exrate:
id: etl-exchange-rate
enabled: false
consumer.group-id: "daway-exrate"

dmt:
url: http://dominant:8022/v1/domain/repository
Expand Down
19 changes: 19 additions & 0 deletions src/main/resources/db/migration/V7__add_new_exchange_rates.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE TABLE dw.ex_rate
(
id BIGSERIAL NOT NULL,
event_id uuid UNIQUE NOT NULL,
event_created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
source_currency_symbolic_code CHARACTER VARYING NOT NULL,
source_currency_exponent SMALLINT NOT NULL,
destination_currency_symbolic_code CHARACTER VARYING NOT NULL,
destination_currency_exponent SMALLINT NOT NULL,
rational_p BIGINT NOT NULL,
rational_q BIGINT NOT NULL,
rate_timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL
);

CREATE INDEX rate_timestamp_idx ON dw.ex_rate (rate_timestamp);

CREATE INDEX source_currency_sc_destination_currency_sc_timestamp_idx ON dw.ex_rate (source_currency_symbolic_code,
destination_currency_symbolic_code,
rate_timestamp);
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"kafka.topics.source.enabled=true",
"kafka.topics.destination.enabled=true",
"kafka.topics.pm-events-payout.enabled=true",
"kafka.topics.limit-config.enabled=true"},
"kafka.topics.limit-config.enabled=true",
"kafka.topics.exrate.enabled=true"},
topicsKeys = {
"kafka.topics.invoice.id",
"kafka.topics.recurrent-payment-tool.id",
Expand All @@ -42,7 +43,10 @@
"kafka.topics.source.id",
"kafka.topics.destination.id",
"kafka.topics.pm-events-payout.id",
"kafka.topics.limit-config.id"})
"kafka.topics.limit-config.id",
"kafka.topics.limit-config.id",
Comment on lines +46 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

дубль

"kafka.topics.exrate.id"}
)
@DefaultSpringBootTest
@Import(KafkaProducer.class)
public @interface KafkaPostgresqlSpringBootITest {
Expand Down
Loading