Skip to content
This repository was archived by the owner on Oct 10, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@
<artifactId>xrates-proto</artifactId>
<version>1.23-bf0d62d</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>limiter-proto</artifactId>
<version>1.32-6158184</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>
<artifactId>shared-resources</artifactId>
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/dev/vality/newway/config/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,10 @@
import dev.vality.newway.serde.SinkEventDeserializer;
import dev.vality.payout.manager.Event;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
Expand All @@ -24,8 +20,6 @@
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

@Configuration
Expand Down Expand Up @@ -136,6 +130,12 @@ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getPartyManagementConcurrency());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> limitConfigContainerFactory(
ConsumerFactory<String, MachineEvent> consumerFactory) {
return createConcurrentFactory(consumerFactory, kafkaConsumerProperties.getLimitConfigConcurrency());
}

private KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MachineEvent>> createConcurrentFactory(
ConsumerFactory<String, MachineEvent> consumerFactory, int threadsNumber) {
ConcurrentKafkaListenerContainerFactory<String, MachineEvent> factory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ public class KafkaConsumerProperties {
private int sourceConcurrency;
private int destinationConcurrency;
private int withdrawalSessionConcurrency;
private int limitConfigConcurrency;

}
15 changes: 15 additions & 0 deletions src/main/java/dev/vality/newway/dao/limiter/LimitConfigDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dev.vality.newway.dao.limiter;

import dev.vality.dao.GenericDao;
import dev.vality.newway.domain.tables.pojos.LimitConfig;
import dev.vality.newway.exception.DaoException;

import java.util.Optional;

public interface LimitConfigDao extends GenericDao {

Optional<Long> save(LimitConfig limitConfig) throws DaoException;

void updateNotCurrent(Long id) throws DaoException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package dev.vality.newway.dao.limiter;

import dev.vality.dao.impl.AbstractGenericDao;
import dev.vality.mapper.RecordRowMapper;
import dev.vality.newway.domain.tables.pojos.LimitConfig;
import dev.vality.newway.exception.DaoException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.support.GeneratedKeyHolder;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.util.Optional;

import static dev.vality.newway.domain.Tables.LIMIT_CONFIG;

@Component
public class LimitConfigDaoImpl extends AbstractGenericDao implements LimitConfigDao {

private final RowMapper<LimitConfig> limitConfigRowMapper;

@Autowired
public LimitConfigDaoImpl(DataSource dataSource) {
super(dataSource);
limitConfigRowMapper = new RecordRowMapper<>(LIMIT_CONFIG, LimitConfig.class);
}

@Override
public Optional<Long> save(LimitConfig limitConfig) throws DaoException {
var limitConfigRecord = getDslContext().newRecord(LIMIT_CONFIG, limitConfig);
var query = getDslContext()
.insertInto(LIMIT_CONFIG)
.set(limitConfigRecord)
.onConflict(LIMIT_CONFIG.LIMIT_CONFIG_ID, LIMIT_CONFIG.SEQUENCE_ID)
.doNothing()
.returning(LIMIT_CONFIG.ID);
GeneratedKeyHolder keyHolder = new GeneratedKeyHolder();
execute(query, keyHolder);
return Optional.ofNullable(keyHolder.getKey()).map(Number::longValue);
}

@Override
public void updateNotCurrent(Long id) throws DaoException {
var query = getDslContext().update(LIMIT_CONFIG).set(LIMIT_CONFIG.CURRENT, false)
.where(LIMIT_CONFIG.ID.eq(id)
.and(LIMIT_CONFIG.CURRENT));
executeOne(query);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dev.vality.newway.factory.machine.event;

import dev.vality.geck.common.util.TypeUtil;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.newway.domain.tables.pojos.LimitConfig;
import org.springframework.stereotype.Component;

@Component
public class LimitConfigMachineEventCopyFactoryImpl implements MachineEventCopyFactory<LimitConfig, String> {

@Override
public LimitConfig create(MachineEvent event, Long sequenceId, String id, LimitConfig old, String occurredAt) {
var limitConfig = old != null ? new LimitConfig(old) : new LimitConfig();
limitConfig.setId(null);
limitConfig.setWtime(null);
limitConfig.setSequenceId(sequenceId.intValue());
limitConfig.setSourceId(id);
limitConfig.setEventCreatedAt(TypeUtil.stringToLocalDateTime(event.getCreatedAt()));
limitConfig.setEventOccuredAt(TypeUtil.stringToLocalDateTime(occurredAt));
return limitConfig;
}

@Override
public LimitConfig create(MachineEvent event, Long sequenceId, String id, String occurredAt) {
return create(event, sequenceId, id, null, occurredAt);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package dev.vality.newway.handler.event.stock;


import dev.vality.geck.filter.Filter;
import org.apache.commons.lang3.NotImplementedException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package dev.vality.newway.handler.event.stock.impl.limiter;

import dev.vality.geck.common.util.TBaseUtil;
import dev.vality.geck.common.util.TypeUtil;
import dev.vality.geck.filter.Filter;
import dev.vality.geck.filter.PathConditionFilter;
import dev.vality.geck.filter.condition.IsNullCondition;
import dev.vality.geck.filter.rule.PathConditionRule;
import dev.vality.limiter.config.LimitScopeType;
import dev.vality.limiter.config.TimestampedChange;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.newway.dao.limiter.LimitConfigDao;
import dev.vality.newway.domain.enums.*;
import dev.vality.newway.domain.tables.pojos.LimitConfig;
import dev.vality.newway.factory.machine.event.MachineEventCopyFactory;
import dev.vality.newway.util.JsonUtil;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.Set;
import java.util.stream.Collectors;

@Slf4j
@Component
@RequiredArgsConstructor
public class LimitConfigCreatedHandler implements LimitConfigHandler {

private final LimitConfigDao limitConfigDao;
private final MachineEventCopyFactory<LimitConfig, String> limitConfigMachineEventCopyFactory;

@Getter
private final Filter filter = new PathConditionFilter(new PathConditionRule(
"change.created",
new IsNullCondition().not()));

@Override
@Transactional(propagation = Propagation.REQUIRED)
public void handle(TimestampedChange timestampedChange, MachineEvent event) {
var change = timestampedChange.getChange();
var limitConfigSource = change.getCreated().getLimitConfig();
var limitConfigId = limitConfigSource.getId();
var sequenceId = event.getEventId();
var sourceId = event.getSourceId();
log.info("Start LimitConfig created handling, sequenceId={}, limitConfigId={}", sequenceId, limitConfigId);
var limitConfig = limitConfigMachineEventCopyFactory.create(
event, sequenceId, sourceId, timestampedChange.getOccuredAt());
limitConfig.setLimitConfigId(limitConfigId);
limitConfig.setProcessorType(limitConfigSource.getProcessorType());
limitConfig.setCreatedAt(TypeUtil.stringToLocalDateTime(limitConfigSource.getCreatedAt()));
limitConfig.setStartedAt(TypeUtil.stringToLocalDateTime(limitConfigSource.getStartedAt()));
limitConfig.setShardSize(limitConfigSource.getShardSize());
if (limitConfigSource.isSetTimeRangeType()) {
limitConfig.setTimeRangeType(TBaseUtil.unionFieldToEnum(
limitConfigSource.getTimeRangeType(), LimitConfigTimeRangeType.class));
switch (limitConfigSource.getTimeRangeType().getSetField()) {
case CALENDAR -> limitConfig.setTimeRangeTypeCalendar(TBaseUtil.unionFieldToEnum(
limitConfigSource.getTimeRangeType().getCalendar(),
LimitConfigTimeRangeTypeCalendar.class));
case INTERVAL -> limitConfig.setTimeRangeTypeIntervalAmount(
limitConfigSource.getTimeRangeType().getInterval().getAmount());
}
}
if (limitConfigSource.isSetContextType()) {
limitConfig.setLimitContextType(TBaseUtil.unionFieldToEnum(
limitConfigSource.getContextType(), LimitConfigLimitContextType.class));
}
if (limitConfigSource.isSetType()) {
if (limitConfigSource.getType().isSetTurnover()) {
var turnover = limitConfigSource.getType().getTurnover();
if (turnover.isSetMetric()) {
var metric = turnover.getMetric();
limitConfig.setLimitTypeTurnoverMetric(
TBaseUtil.unionFieldToEnum(metric, LimitConfigLimitTypeTurnoverMetric.class));
if (metric.isSetAmount()) {
limitConfig.setLimitTypeTurnoverMetricAmountCurrency(metric.getAmount().getCurrency());
}
}
}
}
if (limitConfigSource.isSetScope()) {
limitConfig.setLimitScope(TBaseUtil.unionFieldToEnum(
limitConfigSource.getScope(), LimitConfigLimitScope.class));
switch (limitConfigSource.getScope().getSetField()) {
case SINGLE -> limitConfig.setLimitScopeTypesJson(
getLimitScopeTypesJson(Set.of(limitConfigSource.getScope().getSingle())));
case MULTI -> limitConfig.setLimitScopeTypesJson(
getLimitScopeTypesJson(limitConfigSource.getScope().getMulti()));
}
}
limitConfig.setDescription(limitConfigSource.getDescription());
if (limitConfigSource.isSetOpBehaviour() && limitConfigSource.getOpBehaviour().isSetInvoicePaymentRefund()) {
limitConfig.setOperationLimitBehaviour(TBaseUtil.unionFieldToEnum(
limitConfigSource.getOpBehaviour().getInvoicePaymentRefund(),
LimitConfigOperationLimitBehaviour.class));
}

limitConfigDao.save(limitConfig).ifPresentOrElse(
dbContractId -> log.info("LimitConfig created has been saved, sequenceId={}, limitConfigId={}",
sequenceId, limitConfigId),
() -> log.info("LimitConfig created bound duplicated, sequenceId={}, limitConfigId={}",
sequenceId, limitConfigId));
}

private String getLimitScopeTypesJson(Set<LimitScopeType> limitScopeTypes) {
return JsonUtil.objectToJsonString(limitScopeTypes.stream()
.map(JsonUtil::thriftBaseToJsonNode)
.collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package dev.vality.newway.handler.event.stock.impl.limiter;

import dev.vality.limiter.config.TimestampedChange;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.newway.handler.event.stock.Handler;

public interface LimitConfigHandler extends Handler<TimestampedChange, MachineEvent> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package dev.vality.newway.listener;

import dev.vality.kafka.common.util.LogUtil;
import dev.vality.machinegun.eventsink.SinkEvent;
import dev.vality.newway.service.LimitConfigService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@RequiredArgsConstructor
@Service
public class LimitConfigKafkaListener {

private final LimitConfigService limitConfigService;

@KafkaListener(
autoStartup = "${kafka.topics.limit-config.enabled}",
topics = "${kafka.topics.limit-config.id}",
containerFactory = "limitConfigContainerFactory")
public void handle(List<ConsumerRecord<String, SinkEvent>> messages, Acknowledgment ack) {
log.info("Got machineEvent batch with size: {}", messages.size());
limitConfigService.handleEvents(messages.stream()
.map(m -> m.value().getEvent())
.collect(Collectors.toList()));
ack.acknowledge();
log.info("Batch has been committed, size={}, {}", messages.size(),
LogUtil.toSummaryStringWithSinkEventValues(messages));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package dev.vality.newway.serde;

import dev.vality.limiter.config.TimestampedChange;
import dev.vality.sink.common.parser.impl.MachineEventParser;
import dev.vality.sink.common.serialization.BinaryDeserializer;
import org.springframework.stereotype.Service;

@Service
public class LimitConfigChangeMachineEventParser extends MachineEventParser<TimestampedChange> {

public LimitConfigChangeMachineEventParser(BinaryDeserializer<TimestampedChange> deserializer) {
super(deserializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package dev.vality.newway.serde.deserializer;

import dev.vality.limiter.config.TimestampedChange;
import dev.vality.sink.common.serialization.impl.AbstractThriftBinaryDeserializer;
import org.springframework.stereotype.Service;

@Service
public class LimitConfigChangeDeserializer extends AbstractThriftBinaryDeserializer<TimestampedChange> {

@Override
public TimestampedChange deserialize(byte[] bin) {
return deserialize(bin, new TimestampedChange());
}
}
36 changes: 36 additions & 0 deletions src/main/java/dev/vality/newway/service/LimitConfigService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package dev.vality.newway.service;

import dev.vality.limiter.config.TimestampedChange;
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.newway.handler.event.stock.impl.limiter.LimitConfigHandler;
import dev.vality.sink.common.parser.impl.MachineEventParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Slf4j
@Service
@RequiredArgsConstructor
public class LimitConfigService {

private final List<LimitConfigHandler> handlers;
private final MachineEventParser<TimestampedChange> parser;

@Transactional(propagation = Propagation.REQUIRED)
public void handleEvents(List<MachineEvent> machineEvents) {
machineEvents.forEach(this::handleIfAccept);
}

private void handleIfAccept(MachineEvent machineEvent) {
TimestampedChange eventPayload = parser.parse(machineEvent);
if (eventPayload.isSetChange()) {
handlers.stream()
.filter(handler -> handler.accept(eventPayload))
.forEach(handler -> handler.handle(eventPayload, machineEvent));
}
}
}
Loading