diff --git a/pom.xml b/pom.xml
index 7a12a301..5ec32881 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,6 +172,11 @@
xrates-proto
1.23-bf0d62d
+
+ dev.vality
+ limiter-proto
+ 1.32-6158184
+
dev.vality
shared-resources
diff --git a/src/main/java/dev/vality/newway/config/KafkaConfig.java b/src/main/java/dev/vality/newway/config/KafkaConfig.java
index ada9bbe9..9f602da0 100644
--- a/src/main/java/dev/vality/newway/config/KafkaConfig.java
+++ b/src/main/java/dev/vality/newway/config/KafkaConfig.java
@@ -7,14 +7,10 @@
import dev.vality.newway.serde.SinkEventDeserializer;
import dev.vality.payout.manager.Event;
import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@@ -24,8 +20,6 @@
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
-import java.io.File;
-import java.util.HashMap;
import java.util.Map;
@Configuration
@@ -136,6 +130,12 @@ public KafkaListenerContainerFactory> limitConfigContainerFactory(
+ ConsumerFactory consumerFactory) {
+ return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getLimitConfigConcurrency());
+ }
+
private KafkaListenerContainerFactory> createConcurrentFactory(
ConsumerFactory consumerFactory, int threadsNumber) {
ConcurrentKafkaListenerContainerFactory factory =
diff --git a/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java b/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java
index 596a83c8..a4dab743 100644
--- a/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java
+++ b/src/main/java/dev/vality/newway/config/properties/KafkaConsumerProperties.java
@@ -24,5 +24,6 @@ public class KafkaConsumerProperties {
private int sourceConcurrency;
private int destinationConcurrency;
private int withdrawalSessionConcurrency;
+ private int limitConfigConcurrency;
}
diff --git a/src/main/java/dev/vality/newway/dao/limiter/LimitConfigDao.java b/src/main/java/dev/vality/newway/dao/limiter/LimitConfigDao.java
new file mode 100644
index 00000000..128c1eca
--- /dev/null
+++ b/src/main/java/dev/vality/newway/dao/limiter/LimitConfigDao.java
@@ -0,0 +1,15 @@
+package dev.vality.newway.dao.limiter;
+
+import dev.vality.dao.GenericDao;
+import dev.vality.newway.domain.tables.pojos.LimitConfig;
+import dev.vality.newway.exception.DaoException;
+
+import java.util.Optional;
+
+public interface LimitConfigDao extends GenericDao {
+
+ Optional save(LimitConfig limitConfig) throws DaoException;
+
+ void updateNotCurrent(Long id) throws DaoException;
+
+}
diff --git a/src/main/java/dev/vality/newway/dao/limiter/LimitConfigDaoImpl.java b/src/main/java/dev/vality/newway/dao/limiter/LimitConfigDaoImpl.java
new file mode 100644
index 00000000..87fc02ad
--- /dev/null
+++ b/src/main/java/dev/vality/newway/dao/limiter/LimitConfigDaoImpl.java
@@ -0,0 +1,49 @@
+package dev.vality.newway.dao.limiter;
+
+import dev.vality.dao.impl.AbstractGenericDao;
+import dev.vality.mapper.RecordRowMapper;
+import dev.vality.newway.domain.tables.pojos.LimitConfig;
+import dev.vality.newway.exception.DaoException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.jdbc.support.GeneratedKeyHolder;
+import org.springframework.stereotype.Component;
+
+import javax.sql.DataSource;
+import java.util.Optional;
+
+import static dev.vality.newway.domain.Tables.LIMIT_CONFIG;
+
+@Component
+public class LimitConfigDaoImpl extends AbstractGenericDao implements LimitConfigDao {
+
+ private final RowMapper limitConfigRowMapper;
+
+ @Autowired
+ public LimitConfigDaoImpl(DataSource dataSource) {
+ super(dataSource);
+ limitConfigRowMapper = new RecordRowMapper<>(LIMIT_CONFIG, LimitConfig.class);
+ }
+
+ @Override
+ public Optional save(LimitConfig limitConfig) throws DaoException {
+ var limitConfigRecord = getDslContext().newRecord(LIMIT_CONFIG, limitConfig);
+ var query = getDslContext()
+ .insertInto(LIMIT_CONFIG)
+ .set(limitConfigRecord)
+ .onConflict(LIMIT_CONFIG.LIMIT_CONFIG_ID, LIMIT_CONFIG.SEQUENCE_ID)
+ .doNothing()
+ .returning(LIMIT_CONFIG.ID);
+ GeneratedKeyHolder keyHolder = new GeneratedKeyHolder();
+ execute(query, keyHolder);
+ return Optional.ofNullable(keyHolder.getKey()).map(Number::longValue);
+ }
+
+ @Override
+ public void updateNotCurrent(Long id) throws DaoException {
+ var query = getDslContext().update(LIMIT_CONFIG).set(LIMIT_CONFIG.CURRENT, false)
+ .where(LIMIT_CONFIG.ID.eq(id)
+ .and(LIMIT_CONFIG.CURRENT));
+ executeOne(query);
+ }
+}
diff --git a/src/main/java/dev/vality/newway/factory/machine/event/LimitConfigMachineEventCopyFactoryImpl.java b/src/main/java/dev/vality/newway/factory/machine/event/LimitConfigMachineEventCopyFactoryImpl.java
new file mode 100644
index 00000000..68b79e75
--- /dev/null
+++ b/src/main/java/dev/vality/newway/factory/machine/event/LimitConfigMachineEventCopyFactoryImpl.java
@@ -0,0 +1,27 @@
+package dev.vality.newway.factory.machine.event;
+
+import dev.vality.geck.common.util.TypeUtil;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import dev.vality.newway.domain.tables.pojos.LimitConfig;
+import org.springframework.stereotype.Component;
+
+@Component
+public class LimitConfigMachineEventCopyFactoryImpl implements MachineEventCopyFactory {
+
+ @Override
+ public LimitConfig create(MachineEvent event, Long sequenceId, String id, LimitConfig old, String occurredAt) {
+ var limitConfig = old != null ? new LimitConfig(old) : new LimitConfig();
+ limitConfig.setId(null);
+ limitConfig.setWtime(null);
+ limitConfig.setSequenceId(sequenceId.intValue());
+ limitConfig.setSourceId(id);
+ limitConfig.setEventCreatedAt(TypeUtil.stringToLocalDateTime(event.getCreatedAt()));
+ limitConfig.setEventOccuredAt(TypeUtil.stringToLocalDateTime(occurredAt));
+ return limitConfig;
+ }
+
+ @Override
+ public LimitConfig create(MachineEvent event, Long sequenceId, String id, String occurredAt) {
+ return create(event, sequenceId, id, null, occurredAt);
+ }
+}
diff --git a/src/main/java/dev/vality/newway/handler/event/stock/Handler.java b/src/main/java/dev/vality/newway/handler/event/stock/Handler.java
index 6700e665..aa38a869 100644
--- a/src/main/java/dev/vality/newway/handler/event/stock/Handler.java
+++ b/src/main/java/dev/vality/newway/handler/event/stock/Handler.java
@@ -1,6 +1,5 @@
package dev.vality.newway.handler.event.stock;
-
import dev.vality.geck.filter.Filter;
import org.apache.commons.lang3.NotImplementedException;
diff --git a/src/main/java/dev/vality/newway/handler/event/stock/impl/limiter/LimitConfigCreatedHandler.java b/src/main/java/dev/vality/newway/handler/event/stock/impl/limiter/LimitConfigCreatedHandler.java
new file mode 100644
index 00000000..b01aa1f0
--- /dev/null
+++ b/src/main/java/dev/vality/newway/handler/event/stock/impl/limiter/LimitConfigCreatedHandler.java
@@ -0,0 +1,113 @@
+package dev.vality.newway.handler.event.stock.impl.limiter;
+
+import dev.vality.geck.common.util.TBaseUtil;
+import dev.vality.geck.common.util.TypeUtil;
+import dev.vality.geck.filter.Filter;
+import dev.vality.geck.filter.PathConditionFilter;
+import dev.vality.geck.filter.condition.IsNullCondition;
+import dev.vality.geck.filter.rule.PathConditionRule;
+import dev.vality.limiter.config.LimitScopeType;
+import dev.vality.limiter.config.TimestampedChange;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import dev.vality.newway.dao.limiter.LimitConfigDao;
+import dev.vality.newway.domain.enums.*;
+import dev.vality.newway.domain.tables.pojos.LimitConfig;
+import dev.vality.newway.factory.machine.event.MachineEventCopyFactory;
+import dev.vality.newway.util.JsonUtil;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class LimitConfigCreatedHandler implements LimitConfigHandler {
+
+ private final LimitConfigDao limitConfigDao;
+ private final MachineEventCopyFactory limitConfigMachineEventCopyFactory;
+
+ @Getter
+ private final Filter filter = new PathConditionFilter(new PathConditionRule(
+ "change.created",
+ new IsNullCondition().not()));
+
+ @Override
+ @Transactional(propagation = Propagation.REQUIRED)
+ public void handle(TimestampedChange timestampedChange, MachineEvent event) {
+ var change = timestampedChange.getChange();
+ var limitConfigSource = change.getCreated().getLimitConfig();
+ var limitConfigId = limitConfigSource.getId();
+ var sequenceId = event.getEventId();
+ var sourceId = event.getSourceId();
+ log.info("Start LimitConfig created handling, sequenceId={}, limitConfigId={}", sequenceId, limitConfigId);
+ var limitConfig = limitConfigMachineEventCopyFactory.create(
+ event, sequenceId, sourceId, timestampedChange.getOccuredAt());
+ limitConfig.setLimitConfigId(limitConfigId);
+ limitConfig.setProcessorType(limitConfigSource.getProcessorType());
+ limitConfig.setCreatedAt(TypeUtil.stringToLocalDateTime(limitConfigSource.getCreatedAt()));
+ limitConfig.setStartedAt(TypeUtil.stringToLocalDateTime(limitConfigSource.getStartedAt()));
+ limitConfig.setShardSize(limitConfigSource.getShardSize());
+ if (limitConfigSource.isSetTimeRangeType()) {
+ limitConfig.setTimeRangeType(TBaseUtil.unionFieldToEnum(
+ limitConfigSource.getTimeRangeType(), LimitConfigTimeRangeType.class));
+ switch (limitConfigSource.getTimeRangeType().getSetField()) {
+ case CALENDAR -> limitConfig.setTimeRangeTypeCalendar(TBaseUtil.unionFieldToEnum(
+ limitConfigSource.getTimeRangeType().getCalendar(),
+ LimitConfigTimeRangeTypeCalendar.class));
+ case INTERVAL -> limitConfig.setTimeRangeTypeIntervalAmount(
+ limitConfigSource.getTimeRangeType().getInterval().getAmount());
+ }
+ }
+ if (limitConfigSource.isSetContextType()) {
+ limitConfig.setLimitContextType(TBaseUtil.unionFieldToEnum(
+ limitConfigSource.getContextType(), LimitConfigLimitContextType.class));
+ }
+ if (limitConfigSource.isSetType()) {
+ if (limitConfigSource.getType().isSetTurnover()) {
+ var turnover = limitConfigSource.getType().getTurnover();
+ if (turnover.isSetMetric()) {
+ var metric = turnover.getMetric();
+ limitConfig.setLimitTypeTurnoverMetric(
+ TBaseUtil.unionFieldToEnum(metric, LimitConfigLimitTypeTurnoverMetric.class));
+ if (metric.isSetAmount()) {
+ limitConfig.setLimitTypeTurnoverMetricAmountCurrency(metric.getAmount().getCurrency());
+ }
+ }
+ }
+ }
+ if (limitConfigSource.isSetScope()) {
+ limitConfig.setLimitScope(TBaseUtil.unionFieldToEnum(
+ limitConfigSource.getScope(), LimitConfigLimitScope.class));
+ switch (limitConfigSource.getScope().getSetField()) {
+ case SINGLE -> limitConfig.setLimitScopeTypesJson(
+ getLimitScopeTypesJson(Set.of(limitConfigSource.getScope().getSingle())));
+ case MULTI -> limitConfig.setLimitScopeTypesJson(
+ getLimitScopeTypesJson(limitConfigSource.getScope().getMulti()));
+ }
+ }
+ limitConfig.setDescription(limitConfigSource.getDescription());
+ if (limitConfigSource.isSetOpBehaviour() && limitConfigSource.getOpBehaviour().isSetInvoicePaymentRefund()) {
+ limitConfig.setOperationLimitBehaviour(TBaseUtil.unionFieldToEnum(
+ limitConfigSource.getOpBehaviour().getInvoicePaymentRefund(),
+ LimitConfigOperationLimitBehaviour.class));
+ }
+
+ limitConfigDao.save(limitConfig).ifPresentOrElse(
+ dbContractId -> log.info("LimitConfig created has been saved, sequenceId={}, limitConfigId={}",
+ sequenceId, limitConfigId),
+ () -> log.info("LimitConfig created bound duplicated, sequenceId={}, limitConfigId={}",
+ sequenceId, limitConfigId));
+ }
+
+ private String getLimitScopeTypesJson(Set limitScopeTypes) {
+ return JsonUtil.objectToJsonString(limitScopeTypes.stream()
+ .map(JsonUtil::thriftBaseToJsonNode)
+ .collect(Collectors.toList()));
+ }
+}
diff --git a/src/main/java/dev/vality/newway/handler/event/stock/impl/limiter/LimitConfigHandler.java b/src/main/java/dev/vality/newway/handler/event/stock/impl/limiter/LimitConfigHandler.java
new file mode 100644
index 00000000..504b5d53
--- /dev/null
+++ b/src/main/java/dev/vality/newway/handler/event/stock/impl/limiter/LimitConfigHandler.java
@@ -0,0 +1,8 @@
+package dev.vality.newway.handler.event.stock.impl.limiter;
+
+import dev.vality.limiter.config.TimestampedChange;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import dev.vality.newway.handler.event.stock.Handler;
+
+public interface LimitConfigHandler extends Handler {
+}
diff --git a/src/main/java/dev/vality/newway/listener/LimitConfigKafkaListener.java b/src/main/java/dev/vality/newway/listener/LimitConfigKafkaListener.java
new file mode 100644
index 00000000..26cb18e9
--- /dev/null
+++ b/src/main/java/dev/vality/newway/listener/LimitConfigKafkaListener.java
@@ -0,0 +1,36 @@
+package dev.vality.newway.listener;
+
+import dev.vality.kafka.common.util.LogUtil;
+import dev.vality.machinegun.eventsink.SinkEvent;
+import dev.vality.newway.service.LimitConfigService;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Slf4j
+@RequiredArgsConstructor
+@Service
+public class LimitConfigKafkaListener {
+
+ private final LimitConfigService limitConfigService;
+
+ @KafkaListener(
+ autoStartup = "${kafka.topics.limit-config.enabled}",
+ topics = "${kafka.topics.limit-config.id}",
+ containerFactory = "limitConfigContainerFactory")
+ public void handle(List> messages, Acknowledgment ack) {
+ log.info("Got machineEvent batch with size: {}", messages.size());
+ limitConfigService.handleEvents(messages.stream()
+ .map(m -> m.value().getEvent())
+ .collect(Collectors.toList()));
+ ack.acknowledge();
+ log.info("Batch has been committed, size={}, {}", messages.size(),
+ LogUtil.toSummaryStringWithSinkEventValues(messages));
+ }
+}
diff --git a/src/main/java/dev/vality/newway/serde/LimitConfigChangeMachineEventParser.java b/src/main/java/dev/vality/newway/serde/LimitConfigChangeMachineEventParser.java
new file mode 100644
index 00000000..2c4cc1dc
--- /dev/null
+++ b/src/main/java/dev/vality/newway/serde/LimitConfigChangeMachineEventParser.java
@@ -0,0 +1,14 @@
+package dev.vality.newway.serde;
+
+import dev.vality.limiter.config.TimestampedChange;
+import dev.vality.sink.common.parser.impl.MachineEventParser;
+import dev.vality.sink.common.serialization.BinaryDeserializer;
+import org.springframework.stereotype.Service;
+
+@Service
+public class LimitConfigChangeMachineEventParser extends MachineEventParser {
+
+ public LimitConfigChangeMachineEventParser(BinaryDeserializer deserializer) {
+ super(deserializer);
+ }
+}
diff --git a/src/main/java/dev/vality/newway/serde/deserializer/LimitConfigChangeDeserializer.java b/src/main/java/dev/vality/newway/serde/deserializer/LimitConfigChangeDeserializer.java
new file mode 100644
index 00000000..9ed0744e
--- /dev/null
+++ b/src/main/java/dev/vality/newway/serde/deserializer/LimitConfigChangeDeserializer.java
@@ -0,0 +1,14 @@
+package dev.vality.newway.serde.deserializer;
+
+import dev.vality.limiter.config.TimestampedChange;
+import dev.vality.sink.common.serialization.impl.AbstractThriftBinaryDeserializer;
+import org.springframework.stereotype.Service;
+
+@Service
+public class LimitConfigChangeDeserializer extends AbstractThriftBinaryDeserializer {
+
+ @Override
+ public TimestampedChange deserialize(byte[] bin) {
+ return deserialize(bin, new TimestampedChange());
+ }
+}
diff --git a/src/main/java/dev/vality/newway/service/LimitConfigService.java b/src/main/java/dev/vality/newway/service/LimitConfigService.java
new file mode 100644
index 00000000..00cb2c5f
--- /dev/null
+++ b/src/main/java/dev/vality/newway/service/LimitConfigService.java
@@ -0,0 +1,36 @@
+package dev.vality.newway.service;
+
+import dev.vality.limiter.config.TimestampedChange;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import dev.vality.newway.handler.event.stock.impl.limiter.LimitConfigHandler;
+import dev.vality.sink.common.parser.impl.MachineEventParser;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.List;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class LimitConfigService {
+
+ private final List handlers;
+ private final MachineEventParser parser;
+
+ @Transactional(propagation = Propagation.REQUIRED)
+ public void handleEvents(List machineEvents) {
+ machineEvents.forEach(this::handleIfAccept);
+ }
+
+ private void handleIfAccept(MachineEvent machineEvent) {
+ TimestampedChange eventPayload = parser.parse(machineEvent);
+ if (eventPayload.isSetChange()) {
+ handlers.stream()
+ .filter(handler -> handler.accept(eventPayload))
+ .forEach(handler -> handler.handle(eventPayload, machineEvent));
+ }
+ }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index b6e1d120..1ac43272 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -66,6 +66,7 @@ kafka:
source-concurrency: 7
destination-concurrency: 7
withdrawal-session-concurrency: 7
+ limit-config-concurrency: 7
topics:
invoice:
id: mg-invoice-100-2
@@ -104,6 +105,9 @@ kafka:
pm-events-payout:
id: pm-events-payout
enabled: false
+ limit-config:
+ id: mg-events-lim-config
+ enabled: false
dmt:
url: http://dominant:8022/v1/domain/repository
diff --git a/src/main/resources/db/migration/V3__add_limit_config.sql b/src/main/resources/db/migration/V3__add_limit_config.sql
new file mode 100644
index 00000000..f5d147bc
--- /dev/null
+++ b/src/main/resources/db/migration/V3__add_limit_config.sql
@@ -0,0 +1,35 @@
+create type dw.limit_config_time_range_type as enum ('calendar', 'interval');
+create type dw.limit_config_time_range_type_calendar as enum ('year', 'month', 'week', 'day');
+create type dw.limit_config_limit_context_type as enum ('payment_processing', 'withdrawal_processing');
+create type dw.limit_config_limit_type_turnover_metric as enum ('number', 'amount');
+create type dw.limit_config_limit_scope as enum ('multi', 'single');
+create type dw.limit_config_limit_scope_type as enum ('party', 'shop', 'wallet', 'identity', 'payment_tool');
+create type dw.limit_config_operation_limit_behaviour as enum ('subtraction', 'addition');
+
+create table if not exists dw.limit_config
+(
+ id bigserial not null,
+ source_id varchar not null,
+ sequence_id int not null,
+ event_occured_at timestamp not null,
+ event_created_at timestamp not null,
+ limit_config_id varchar not null,
+ processor_type varchar not null,
+ created_at timestamp not null,
+ started_at timestamp not null,
+ shard_size bigint not null,
+ time_range_type dw.limit_config_time_range_type not null,
+ time_range_type_calendar dw.limit_config_time_range_type_calendar,
+ time_range_type_interval_amount bigint,
+ limit_context_type dw.limit_config_limit_context_type not null,
+ limit_type_turnover_metric dw.limit_config_limit_type_turnover_metric,
+ limit_type_turnover_metric_amount_currency varchar,
+ limit_scope dw.limit_config_limit_scope,
+ limit_scope_types_json text,
+ description varchar,
+ operation_limit_behaviour dw.limit_config_operation_limit_behaviour,
+ wtime TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (now() at time zone 'utc'),
+ current BOOLEAN NOT NULL DEFAULT TRUE,
+ constraint limit_config_id_pkey primary key (id),
+ constraint limit_config_limit_config_id_ukey unique (limit_config_id, sequence_id)
+);
\ No newline at end of file
diff --git a/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java b/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java
index 292f3ee6..21722a42 100644
--- a/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java
+++ b/src/test/java/dev/vality/newway/config/KafkaPostgresqlSpringBootITest.java
@@ -27,7 +27,8 @@
"kafka.topics.withdrawal-session.enabled=true",
"kafka.topics.source.enabled=true",
"kafka.topics.destination.enabled=true",
- "kafka.topics.pm-events-payout.enabled=true"},
+ "kafka.topics.pm-events-payout.enabled=true",
+ "kafka.topics.limit-config.enabled=true"},
topicsKeys = {
"kafka.topics.invoice.id",
"kafka.topics.recurrent-payment-tool.id",
@@ -40,7 +41,8 @@
"kafka.topics.withdrawal-session.id",
"kafka.topics.source.id",
"kafka.topics.destination.id",
- "kafka.topics.pm-events-payout.id"})
+ "kafka.topics.pm-events-payout.id",
+ "kafka.topics.limit-config.id"})
@DefaultSpringBootTest
@Import(KafkaProducer.class)
public @interface KafkaPostgresqlSpringBootITest {
diff --git a/src/test/java/dev/vality/newway/dao/LimitConfigDaoTest.java b/src/test/java/dev/vality/newway/dao/LimitConfigDaoTest.java
new file mode 100644
index 00000000..5c45d11e
--- /dev/null
+++ b/src/test/java/dev/vality/newway/dao/LimitConfigDaoTest.java
@@ -0,0 +1,60 @@
+package dev.vality.newway.dao;
+
+import dev.vality.limiter.config.LimitScopeType;
+import dev.vality.mapper.RecordRowMapper;
+import dev.vality.newway.config.PostgresqlSpringBootITest;
+import dev.vality.newway.dao.limiter.LimitConfigDao;
+import dev.vality.newway.domain.tables.pojos.LimitConfig;
+import dev.vality.newway.util.JsonUtil;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.dao.EmptyResultDataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static dev.vality.newway.domain.tables.LimitConfig.LIMIT_CONFIG;
+import static dev.vality.newway.utils.LimitConfigGenerator.getLimitConfig;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@PostgresqlSpringBootITest
+public class LimitConfigDaoTest {
+
+ public static final String SELECT_CURRENT = "select * from dw.limit_config where limit_config_id = ? and current = true;";
+
+ @Autowired
+ private JdbcTemplate jdbcTemplate;
+
+ @Autowired
+ private LimitConfigDao limitConfigDao;
+
+ @Test
+ public void limitConfigDaoTest() {
+ var pojo = dev.vality.testcontainers.annotations.util.RandomBeans.random(LimitConfig.class);
+ pojo.setCurrent(true);
+ pojo.setLimitScopeTypesJson(getLimitScopeTypesJson(getLimitConfig(pojo.getLimitConfigId()).getScope().getMulti()));
+ var id = limitConfigDao.save(pojo).get();
+ pojo.setId(id);
+ var limitConfigId = pojo.getLimitConfigId();
+ var actual = selectCurrent(limitConfigId);
+ assertEquals(pojo, actual);
+ limitConfigDao.updateNotCurrent(actual.getId());
+ limitConfigDao.save(pojo);
+ assertThrows(EmptyResultDataAccessException.class, () -> selectCurrent(pojo.getLimitConfigId()));
+ }
+
+ private LimitConfig selectCurrent(String limitConfigId) {
+ return jdbcTemplate.queryForObject(
+ SELECT_CURRENT,
+ new RecordRowMapper<>(LIMIT_CONFIG, LimitConfig.class),
+ limitConfigId);
+ }
+
+ private String getLimitScopeTypesJson(Set limitScopeTypes) {
+ return JsonUtil.objectToJsonString(limitScopeTypes.stream()
+ .map(JsonUtil::thriftBaseToJsonNode)
+ .collect(Collectors.toList()));
+ }
+}
diff --git a/src/test/java/dev/vality/newway/kafka/KafkaProducer.java b/src/test/java/dev/vality/newway/kafka/KafkaProducer.java
index 1b3ceb06..505161cd 100644
--- a/src/test/java/dev/vality/newway/kafka/KafkaProducer.java
+++ b/src/test/java/dev/vality/newway/kafka/KafkaProducer.java
@@ -24,7 +24,6 @@ public class KafkaProducer {
public void sendMessage(String topic) {
SinkEvent sinkEvent = new SinkEvent();
sinkEvent.setEvent(createMessage());
-
testThriftKafkaProducer.send(topic, sinkEvent);
}
diff --git a/src/test/java/dev/vality/newway/kafka/LimitConfigKafkaListenerTest.java b/src/test/java/dev/vality/newway/kafka/LimitConfigKafkaListenerTest.java
new file mode 100644
index 00000000..9839aa7f
--- /dev/null
+++ b/src/test/java/dev/vality/newway/kafka/LimitConfigKafkaListenerTest.java
@@ -0,0 +1,33 @@
+package dev.vality.newway.kafka;
+
+import dev.vality.newway.config.KafkaPostgresqlSpringBootITest;
+import dev.vality.newway.service.LimitConfigService;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.mock.mockito.MockBean;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.anyList;
+
+@KafkaPostgresqlSpringBootITest
+public class LimitConfigKafkaListenerTest {
+
+ @Value("${kafka.topics.limit-config.id}")
+ public String topic;
+
+ @Autowired
+ private KafkaProducer kafkaProducer;
+
+ @MockBean
+ private LimitConfigService limitConfigService;
+
+ @Test
+ public void listenEmptyChanges() {
+ kafkaProducer.sendMessage(topic);
+ Mockito.verify(limitConfigService, Mockito.timeout(TimeUnit.MINUTES.toMillis(1)).times(1))
+ .handleEvents(anyList());
+ }
+}
diff --git a/src/test/java/dev/vality/newway/service/LimitConfigServiceTest.java b/src/test/java/dev/vality/newway/service/LimitConfigServiceTest.java
new file mode 100644
index 00000000..c3f82607
--- /dev/null
+++ b/src/test/java/dev/vality/newway/service/LimitConfigServiceTest.java
@@ -0,0 +1,75 @@
+package dev.vality.newway.service;
+
+import dev.vality.limiter.config.LimitConfig;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import dev.vality.mapper.RecordRowMapper;
+import dev.vality.newway.config.PostgresqlSpringBootITest;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.test.jdbc.JdbcTestUtils;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static dev.vality.newway.dao.LimitConfigDaoTest.SELECT_CURRENT;
+import static dev.vality.newway.domain.tables.LimitConfig.LIMIT_CONFIG;
+import static dev.vality.newway.utils.LimitConfigGenerator.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@PostgresqlSpringBootITest
+public class LimitConfigServiceTest {
+
+ private static final String TABLE_NAME = LIMIT_CONFIG.getSchema().getName() + "." + LIMIT_CONFIG.getName();
+
+ @Autowired
+ private LimitConfigService limitConfigService;
+
+ @Autowired
+ private JdbcTemplate jdbcTemplate;
+
+ @Test
+ public void shouldHandleAndSave() {
+ var limitConfigId = UUID.randomUUID().toString();
+ var limitConfig = getLimitConfig(limitConfigId);
+ limitConfigService.handleEvents(List.of(
+ getMachineEvent(limitConfigId, limitConfig), getMachineEvent(UUID.randomUUID().toString())));
+ assertThat(JdbcTestUtils.countRowsInTable(jdbcTemplate, TABLE_NAME))
+ .isEqualTo(2);
+ var saved = selectCurrent(limitConfigId);
+ assertThat(saved.getShardSize())
+ .isEqualTo(limitConfig.getShardSize());
+ }
+
+ @Test
+ public void shouldHandleAndSaveWithZeroScope() {
+ var limitConfigId = UUID.randomUUID().toString();
+ var limitConfig = getLimitConfig(limitConfigId);
+ limitConfig.getScope().setMulti(Set.of());
+ limitConfigService.handleEvents(List.of(getMachineEvent(limitConfigId, limitConfig)));
+ assertThat(JdbcTestUtils.countRowsInTable(jdbcTemplate, TABLE_NAME))
+ .isEqualTo(1);
+ var saved = selectCurrent(limitConfigId);
+ assertThat(saved.getShardSize())
+ .isEqualTo(limitConfig.getShardSize());
+ }
+
+ private dev.vality.newway.domain.tables.pojos.LimitConfig selectCurrent(String limitConfigId) {
+ return jdbcTemplate.queryForObject(
+ SELECT_CURRENT,
+ new RecordRowMapper<>(LIMIT_CONFIG, dev.vality.newway.domain.tables.pojos.LimitConfig.class),
+ limitConfigId);
+ }
+
+ private MachineEvent getMachineEvent(String limitConfigId) {
+ return getMachineEvent(limitConfigId, getLimitConfig(limitConfigId));
+ }
+
+ private MachineEvent getMachineEvent(String limitConfigId, LimitConfig limitConfig) {
+ return getEvent(
+ limitConfigId,
+ 1,
+ getCreated(limitConfig));
+ }
+}
diff --git a/src/test/java/dev/vality/newway/utils/LimitConfigGenerator.java b/src/test/java/dev/vality/newway/utils/LimitConfigGenerator.java
new file mode 100644
index 00000000..0a74ff2f
--- /dev/null
+++ b/src/test/java/dev/vality/newway/utils/LimitConfigGenerator.java
@@ -0,0 +1,67 @@
+package dev.vality.newway.utils;
+
+import dev.vality.geck.common.util.TypeUtil;
+import dev.vality.limiter.config.*;
+import dev.vality.machinegun.eventsink.MachineEvent;
+import lombok.SneakyThrows;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Set;
+
+import static dev.vality.testcontainers.annotations.util.RandomBeans.randomThriftOnlyRequiredFields;
+
+public class LimitConfigGenerator {
+
+ public static List getEvents(
+ String sourceId,
+ long sequenceId,
+ Change change) {
+ return List.of(getEvent(sourceId, sequenceId, change));
+ }
+
+ public static MachineEvent getEvent(
+ String sourceId,
+ long sequenceId,
+ Change change) {
+ return new MachineEvent()
+ .setData(toByteArray(new TimestampedChange()
+ .setChange(change)
+ .setOccuredAt(getTemporal())))
+ .setCreatedAt(Instant.now().toString())
+ .setEventId(sequenceId)
+ .setSourceNs("source_ns")
+ .setSourceId(sourceId);
+ }
+
+ public static Change getCreated(LimitConfig limitConfig) {
+ return Change.created(new CreatedChange(limitConfig));
+ }
+
+ public static LimitConfig getLimitConfig(String limitConfigId) {
+ return randomThriftOnlyRequiredFields(LimitConfig.class)
+ .setId(limitConfigId)
+ .setType(LimitType.turnover(new LimitTypeTurnover().setMetric(LimitTurnoverMetric.amount(new LimitTurnoverAmount("RUB")))))
+ .setScope(LimitScope.multi(Set.of(LimitScopeType.identity(new LimitScopeEmptyDetails()),
+ LimitScopeType.party(new LimitScopeEmptyDetails()),
+ LimitScopeType.shop(new LimitScopeEmptyDetails()))))
+ .setDescription("asd")
+ .setCreatedAt(getTemporal())
+ .setStartedAt(getTemporal())
+ .setOpBehaviour(new OperationLimitBehaviour().setInvoicePaymentRefund(OperationBehaviour.addition(new Addition())));
+ }
+
+ @SneakyThrows
+ private static dev.vality.machinegun.msgpack.Value toByteArray(TBase, ?> thrift) {
+ return dev.vality.machinegun.msgpack.Value.bin(
+ new TSerializer(new TBinaryProtocol.Factory())
+ .serialize(thrift));
+ }
+
+ private static String getTemporal() {
+ return TypeUtil.temporalToString(Instant.now());
+ }
+}