Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>dev.vality</groupId>
<artifactId>service-parent-pom</artifactId>
<version>3.1.7</version>
<version>3.1.9</version>
</parent>

<artifactId>wallets-hooker</artifactId>
Expand Down Expand Up @@ -133,7 +133,7 @@
<dependency>
<groupId>dev.vality</groupId>
<artifactId>swag-wallets-webhook-events-server</artifactId>
<version>1.41-1174f82</version>
<version>1.42-d30b8f0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
Expand Down Expand Up @@ -184,7 +184,7 @@
<dependency>
<groupId>dev.vality</groupId>
<artifactId>testcontainers-annotations</artifactId>
<version>3.0.2</version>
<version>4.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package dev.vality.wallets.hooker.config;

import dev.vality.fistful.withdrawal.ManagementSrv;
import dev.vality.woody.thrift.impl.http.THSpawnClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.net.URI;

@Configuration
public class FistfulClientConfig {

@Value("${service.withdrawal.url}")
private String withdrawalUrl;

@Value("${service.withdrawal.networkTimeout}")
private int networkTimeout;

@Bean
public ManagementSrv.Iface withdrawalFistfulClient() throws IOException {
return new THSpawnClientBuilder()
.withNetworkTimeout(networkTimeout)
.withAddress(URI.create(withdrawalUrl))
.build(ManagementSrv.Iface.class);
}
}
43 changes: 43 additions & 0 deletions src/main/java/dev/vality/wallets/hooker/config/RetryConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package dev.vality.wallets.hooker.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

@Slf4j
@Configuration
public class RetryConfig {

@Value("${service.withdrawal.retry.maxAttempts}")
private int maxAttempts;

@Value("${service.withdrawal.retry.initialIntervalMs}")
private long initialIntervalMs;

@Value("${service.withdrawal.retry.multiplier}")
private double multiplier;

@Value("${service.withdrawal.retry.maxIntervalMs}")
private long maxIntervalMs;

@Bean
public RetryTemplate withdrawalRetryTemplate() {
var retryTemplate = new RetryTemplate();

var retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(maxAttempts);
retryTemplate.setRetryPolicy(retryPolicy);

var backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(initialIntervalMs);
backOffPolicy.setMultiplier(multiplier);
backOffPolicy.setMaxInterval(maxIntervalMs);
retryTemplate.setBackOffPolicy(backOffPolicy);

return retryTemplate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dev.vality.fistful.withdrawal.StatusChange;
import dev.vality.fistful.withdrawal.status.Status;
import dev.vality.swag.wallets.webhook.events.model.Event;
import dev.vality.swag.wallets.webhook.events.model.Fee;
import dev.vality.swag.wallets.webhook.events.model.WithdrawalFailed;
import dev.vality.swag.wallets.webhook.events.model.WithdrawalSucceeded;
import dev.vality.wallets.hooker.domain.WebHookModel;
Expand All @@ -14,6 +15,8 @@
import dev.vality.wallets.hooker.model.MessageGenParams;
import dev.vality.wallets.hooker.service.BaseHookMessageGenerator;
import dev.vality.wallets.hooker.service.WebHookMessageGeneratorServiceImpl;
import dev.vality.wallets.hooker.service.WithdrawalClient;
import dev.vality.wallets.hooker.utils.CashFlowUtils;
import dev.vality.webhook.dispatcher.WebhookMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -29,16 +32,19 @@ public class WithdrawalStatusChangedHookMessageGenerator extends BaseHookMessage
private final WebHookMessageGeneratorServiceImpl<StatusChange> generatorService;
private final ObjectMapper objectMapper;
private final AdditionalHeadersGenerator additionalHeadersGenerator;
private final WithdrawalClient withdrawalClient;

public WithdrawalStatusChangedHookMessageGenerator(
WebHookMessageGeneratorServiceImpl<StatusChange> generatorService,
ObjectMapper objectMapper,
AdditionalHeadersGenerator additionalHeadersGenerator,
WithdrawalClient withdrawalClient,
@Value("${parent.not.exist.id}") Long parentId) {
super(parentId);
this.generatorService = generatorService;
this.objectMapper = objectMapper;
this.additionalHeadersGenerator = additionalHeadersGenerator;
this.withdrawalClient = withdrawalClient;
}

@Override
Expand Down Expand Up @@ -97,9 +103,11 @@ private String initRequestBody(
withdrawalFailed.setTopic(Event.TopicEnum.WITHDRAWAL_TOPIC);
return objectMapper.writeValueAsString(withdrawalFailed);
} else if (status.isSetSucceeded()) {
Fee fee = calculateFee(withdrawalId, eventId);
WithdrawalSucceeded withdrawalSucceeded = new WithdrawalSucceeded()
.withdrawalID(withdrawalId)
.externalID(externalId);
.externalID(externalId)
.fee(fee);
withdrawalSucceeded.setEventType(Event.EventTypeEnum.WITHDRAWAL_SUCCEEDED);
withdrawalSucceeded.setEventID(eventId.toString());
withdrawalSucceeded.setOccuredAt(OffsetDateTime.parse(createdAt));
Expand All @@ -114,4 +122,24 @@ private String initRequestBody(
}
}

private Fee calculateFee(String withdrawalId, Long eventId) {
try {
var withdrawalState = withdrawalClient.getWithdrawalInfo(withdrawalId, eventId);
if (withdrawalState != null
&& withdrawalState.getEffectiveFinalCashFlow() != null
&& withdrawalState.getEffectiveFinalCashFlow().getPostings() != null) {
long amount = CashFlowUtils.getWithdrawalFee(withdrawalState.getEffectiveFinalCashFlow().getPostings());
String currency = withdrawalState.getBody().getCurrency().getSymbolicCode();
return new Fee().amount(amount).currency(currency);
}
log.warn("Unable to calculate fee for withdrawalId={}, eventId={}: missing cash flow data",
withdrawalId, eventId);
return null;
} catch (Exception e) {
log.warn("Error calculating fee for withdrawalId={}, eventId={}: {}",
withdrawalId, eventId, e.getMessage());
return null;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package dev.vality.wallets.hooker.service;

import dev.vality.fistful.base.EventRange;
import dev.vality.fistful.withdrawal.ManagementSrv;
import dev.vality.fistful.withdrawal.WithdrawalState;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class WithdrawalClient {

private final ManagementSrv.Iface withdrawalFistfulClient;
private final RetryTemplate retryTemplate;

public WithdrawalState getWithdrawalInfo(String withdrawalId, long eventId) {
try {
return retryTemplate.execute(context -> {
log.debug("Attempt {} to fetch withdrawal state for withdrawalId={}, eventId={}",
context.getRetryCount() + 1, withdrawalId, eventId);

WithdrawalState withdrawalState = withdrawalFistfulClient.get(withdrawalId, createEventRange(eventId));
if (withdrawalState == null) {
log.warn("Withdrawal not found for withdrawalId={}, eventId={} (attempt {})",
withdrawalId, eventId, context.getRetryCount() + 1);
throw new RuntimeException("Withdrawal not found!");
}

log.debug("Successfully fetched withdrawal state for withdrawalId={} (attempt {})",
withdrawalId, context.getRetryCount() + 1);
return withdrawalState;
});
} catch (Exception e) {
log.error("Error fetching withdrawal state for withdrawalId={}, eventId={}", withdrawalId, eventId, e);
throw new RuntimeException("Failed to fetch withdrawal state", e);
}
}

private EventRange createEventRange(long eventId) {
return new EventRange().setLimit((int) eventId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package dev.vality.wallets.hooker.utils;

import dev.vality.fistful.cashflow.FinalCashFlowPosting;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.util.List;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CashFlowUtils {

public static long getWithdrawalFee(List<FinalCashFlowPosting> postings) {
return postings.stream()
.filter(posting -> posting.getSource().getAccountType().isSetWallet()
&& posting.getDestination().getAccountType().isSetSystem())
.map(posting -> posting.getVolume().getAmount())
.reduce(0L, Long::sum);
}
}
13 changes: 12 additions & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,19 @@ parent:
exist:
id: -1

service:
withdrawal:
url: http://fistful:8022/v1/
networkTimeout: 5000
retry:
maxAttempts: 10
initialIntervalMs: 1000
multiplier: 2.0
maxIntervalMs: 10000

testcontainers:
postgresql:
tag: '11.4'
kafka:
tag: '6.2.0'
confluent:
tag: '7.8.0'
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package dev.vality.wallets.hooker.config;

import dev.vality.testcontainers.annotations.KafkaConfig;
import dev.vality.testcontainers.annotations.KafkaTestConfig;
import dev.vality.testcontainers.annotations.kafka.KafkaTestcontainerSingleton;
import dev.vality.testcontainers.annotations.kafka.constants.Provider;
import dev.vality.testcontainers.annotations.postgresql.PostgresqlTestcontainerSingleton;
import org.springframework.boot.test.context.SpringBootTest;

Expand All @@ -14,6 +15,7 @@
@Retention(RetentionPolicy.RUNTIME)
@PostgresqlTestcontainerSingleton
@KafkaTestcontainerSingleton(
provider = Provider.CONFLUENT,
properties = {
"kafka.topic.wallet.listener.enabled=true",
"kafka.topic.withdrawal.listener.enabled=true",
Expand All @@ -24,6 +26,6 @@
"kafka.topic.withdrawal.name",
"kafka.topic.destination.name"})
@SpringBootTest
@KafkaConfig
@KafkaTestConfig
public @interface KafkaPostgresqlSpringBootITest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import dev.vality.wallets.hooker.domain.WebHookModel;
import dev.vality.wallets.hooker.domain.enums.EventType;
import dev.vality.wallets.hooker.domain.tables.pojos.Webhook;
import dev.vality.wallets.hooker.service.WithdrawalClient;
import dev.vality.fistful.withdrawal.ManagementSrv;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.bean.override.mockito.MockitoBean;

import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -21,6 +24,12 @@ public class WebHookDaoImplTest {
@Autowired
private WebHookDao webHookDao;

@MockitoBean
private WithdrawalClient withdrawalClient;

@MockitoBean
private ManagementSrv.Iface withdrawalFistfulClient;

@Test
void create() {
LinkedHashSet<EventType> eventTypes = new LinkedHashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import dev.vality.wallets.hooker.domain.WebHookModel;
import dev.vality.wallets.hooker.domain.enums.EventType;
import dev.vality.wallets.hooker.service.WebHookMessageSenderService;
import dev.vality.wallets.hooker.service.WithdrawalClient;
import dev.vality.fistful.withdrawal.ManagementSrv;
import dev.vality.wallets.hooker.service.kafka.DestinationEventService;
import dev.vality.wallets.hooker.service.kafka.WithdrawalEventService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -35,6 +36,12 @@ class DestinationEventHandlerTest {
@MockitoBean
private WebHookMessageSenderService webHookMessageSenderService;

@MockitoBean
private WithdrawalClient withdrawalClient;

@MockitoBean
private ManagementSrv.Iface withdrawalFistfulClient;

@Test
void failHandleDestinationCreated() {
WebHookModel webhook = TestBeanFactory.createWebhookModel();
Expand All @@ -61,4 +68,4 @@ void handleDestinationCreatedAndAccountChange() {
verify(webHookMessageSenderService, times(1))
.send(any());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import dev.vality.wallets.hooker.dao.webhook.WebHookDao;
import dev.vality.wallets.hooker.domain.WebHookModel;
import dev.vality.wallets.hooker.service.WebHookMessageSenderService;
import dev.vality.wallets.hooker.service.WithdrawalClient;
import dev.vality.fistful.withdrawal.ManagementSrv;
import dev.vality.wallets.hooker.service.kafka.WithdrawalEventService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -29,6 +31,12 @@ class WithdrawalEventHandlerTest {
@MockitoBean
private WebHookMessageSenderService webHookMessageSenderService;

@MockitoBean
private WithdrawalClient withdrawalClient;

@MockitoBean
private ManagementSrv.Iface withdrawalFistfulClient;

@Test
void handleWithdrawalCreatedAndAndStatusChange() throws InterruptedException {
WebHookModel webhook = TestBeanFactory.createWebhookModel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.machinegun.eventsink.SinkEvent;
import dev.vality.testcontainers.annotations.kafka.config.KafkaProducerConfig;
import dev.vality.testcontainers.annotations.kafka.config.KafkaProducerTestConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TBase;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -13,7 +13,7 @@
import java.time.format.DateTimeFormatter;

@TestComponent
@Import(KafkaProducerConfig.class)
@Import(KafkaProducerTestConfig.class)
@Slf4j
public class KafkaProducer {

Expand Down
Loading
Loading