diff --git a/pom.xml b/pom.xml
index 6b882d7..7b5d431 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
dev.vality
service-parent-pom
- 3.1.7
+ 3.1.9
wallets-hooker
@@ -133,7 +133,7 @@
dev.vality
swag-wallets-webhook-events-server
- 1.41-1174f82
+ 1.42-d30b8f0
com.google.guava
@@ -184,7 +184,7 @@
dev.vality
testcontainers-annotations
- 3.0.2
+ 4.0.0
test
diff --git a/src/main/java/dev/vality/wallets/hooker/config/FistfulClientConfig.java b/src/main/java/dev/vality/wallets/hooker/config/FistfulClientConfig.java
new file mode 100644
index 0000000..c7fe96b
--- /dev/null
+++ b/src/main/java/dev/vality/wallets/hooker/config/FistfulClientConfig.java
@@ -0,0 +1,28 @@
+package dev.vality.wallets.hooker.config;
+
+import dev.vality.fistful.withdrawal.ManagementSrv;
+import dev.vality.woody.thrift.impl.http.THSpawnClientBuilder;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.io.IOException;
+import java.net.URI;
+
+@Configuration
+public class FistfulClientConfig {
+
+ @Value("${service.withdrawal.url}")
+ private String withdrawalUrl;
+
+ @Value("${service.withdrawal.networkTimeout}")
+ private int networkTimeout;
+
+ @Bean
+ public ManagementSrv.Iface withdrawalFistfulClient() throws IOException {
+ return new THSpawnClientBuilder()
+ .withNetworkTimeout(networkTimeout)
+ .withAddress(URI.create(withdrawalUrl))
+ .build(ManagementSrv.Iface.class);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/dev/vality/wallets/hooker/config/RetryConfig.java b/src/main/java/dev/vality/wallets/hooker/config/RetryConfig.java
new file mode 100644
index 0000000..3f8a762
--- /dev/null
+++ b/src/main/java/dev/vality/wallets/hooker/config/RetryConfig.java
@@ -0,0 +1,43 @@
+package dev.vality.wallets.hooker.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.retry.backoff.ExponentialBackOffPolicy;
+import org.springframework.retry.policy.SimpleRetryPolicy;
+import org.springframework.retry.support.RetryTemplate;
+
+@Slf4j
+@Configuration
+public class RetryConfig {
+
+ @Value("${service.withdrawal.retry.maxAttempts}")
+ private int maxAttempts;
+
+ @Value("${service.withdrawal.retry.initialIntervalMs}")
+ private long initialIntervalMs;
+
+ @Value("${service.withdrawal.retry.multiplier}")
+ private double multiplier;
+
+ @Value("${service.withdrawal.retry.maxIntervalMs}")
+ private long maxIntervalMs;
+
+ @Bean
+ public RetryTemplate withdrawalRetryTemplate() {
+ var retryTemplate = new RetryTemplate();
+
+ var retryPolicy = new SimpleRetryPolicy();
+ retryPolicy.setMaxAttempts(maxAttempts);
+ retryTemplate.setRetryPolicy(retryPolicy);
+
+ var backOffPolicy = new ExponentialBackOffPolicy();
+ backOffPolicy.setInitialInterval(initialIntervalMs);
+ backOffPolicy.setMultiplier(multiplier);
+ backOffPolicy.setMaxInterval(maxIntervalMs);
+ retryTemplate.setBackOffPolicy(backOffPolicy);
+
+ return retryTemplate;
+ }
+}
diff --git a/src/main/java/dev/vality/wallets/hooker/handler/withdrawal/generator/WithdrawalStatusChangedHookMessageGenerator.java b/src/main/java/dev/vality/wallets/hooker/handler/withdrawal/generator/WithdrawalStatusChangedHookMessageGenerator.java
index 7887277..987ded3 100644
--- a/src/main/java/dev/vality/wallets/hooker/handler/withdrawal/generator/WithdrawalStatusChangedHookMessageGenerator.java
+++ b/src/main/java/dev/vality/wallets/hooker/handler/withdrawal/generator/WithdrawalStatusChangedHookMessageGenerator.java
@@ -5,6 +5,7 @@
import dev.vality.fistful.withdrawal.StatusChange;
import dev.vality.fistful.withdrawal.status.Status;
import dev.vality.swag.wallets.webhook.events.model.Event;
+import dev.vality.swag.wallets.webhook.events.model.Fee;
import dev.vality.swag.wallets.webhook.events.model.WithdrawalFailed;
import dev.vality.swag.wallets.webhook.events.model.WithdrawalSucceeded;
import dev.vality.wallets.hooker.domain.WebHookModel;
@@ -14,6 +15,8 @@
import dev.vality.wallets.hooker.model.MessageGenParams;
import dev.vality.wallets.hooker.service.BaseHookMessageGenerator;
import dev.vality.wallets.hooker.service.WebHookMessageGeneratorServiceImpl;
+import dev.vality.wallets.hooker.service.WithdrawalClient;
+import dev.vality.wallets.hooker.utils.CashFlowUtils;
import dev.vality.webhook.dispatcher.WebhookMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@@ -29,16 +32,19 @@ public class WithdrawalStatusChangedHookMessageGenerator extends BaseHookMessage
private final WebHookMessageGeneratorServiceImpl generatorService;
private final ObjectMapper objectMapper;
private final AdditionalHeadersGenerator additionalHeadersGenerator;
+ private final WithdrawalClient withdrawalClient;
public WithdrawalStatusChangedHookMessageGenerator(
WebHookMessageGeneratorServiceImpl generatorService,
ObjectMapper objectMapper,
AdditionalHeadersGenerator additionalHeadersGenerator,
+ WithdrawalClient withdrawalClient,
@Value("${parent.not.exist.id}") Long parentId) {
super(parentId);
this.generatorService = generatorService;
this.objectMapper = objectMapper;
this.additionalHeadersGenerator = additionalHeadersGenerator;
+ this.withdrawalClient = withdrawalClient;
}
@Override
@@ -97,9 +103,11 @@ private String initRequestBody(
withdrawalFailed.setTopic(Event.TopicEnum.WITHDRAWAL_TOPIC);
return objectMapper.writeValueAsString(withdrawalFailed);
} else if (status.isSetSucceeded()) {
+ Fee fee = calculateFee(withdrawalId, eventId);
WithdrawalSucceeded withdrawalSucceeded = new WithdrawalSucceeded()
.withdrawalID(withdrawalId)
- .externalID(externalId);
+ .externalID(externalId)
+ .fee(fee);
withdrawalSucceeded.setEventType(Event.EventTypeEnum.WITHDRAWAL_SUCCEEDED);
withdrawalSucceeded.setEventID(eventId.toString());
withdrawalSucceeded.setOccuredAt(OffsetDateTime.parse(createdAt));
@@ -114,4 +122,24 @@ private String initRequestBody(
}
}
+ private Fee calculateFee(String withdrawalId, Long eventId) {
+ try {
+ var withdrawalState = withdrawalClient.getWithdrawalInfo(withdrawalId, eventId);
+ if (withdrawalState != null
+ && withdrawalState.getEffectiveFinalCashFlow() != null
+ && withdrawalState.getEffectiveFinalCashFlow().getPostings() != null) {
+ long amount = CashFlowUtils.getWithdrawalFee(withdrawalState.getEffectiveFinalCashFlow().getPostings());
+ String currency = withdrawalState.getBody().getCurrency().getSymbolicCode();
+ return new Fee().amount(amount).currency(currency);
+ }
+ log.warn("Unable to calculate fee for withdrawalId={}, eventId={}: missing cash flow data",
+ withdrawalId, eventId);
+ return null;
+ } catch (Exception e) {
+ log.warn("Error calculating fee for withdrawalId={}, eventId={}: {}",
+ withdrawalId, eventId, e.getMessage());
+ return null;
+ }
+ }
+
}
diff --git a/src/main/java/dev/vality/wallets/hooker/service/WithdrawalClient.java b/src/main/java/dev/vality/wallets/hooker/service/WithdrawalClient.java
new file mode 100644
index 0000000..c71c372
--- /dev/null
+++ b/src/main/java/dev/vality/wallets/hooker/service/WithdrawalClient.java
@@ -0,0 +1,45 @@
+package dev.vality.wallets.hooker.service;
+
+import dev.vality.fistful.base.EventRange;
+import dev.vality.fistful.withdrawal.ManagementSrv;
+import dev.vality.fistful.withdrawal.WithdrawalState;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.retry.support.RetryTemplate;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class WithdrawalClient {
+
+ private final ManagementSrv.Iface withdrawalFistfulClient;
+ private final RetryTemplate retryTemplate;
+
+ public WithdrawalState getWithdrawalInfo(String withdrawalId, long eventId) {
+ try {
+ return retryTemplate.execute(context -> {
+ log.debug("Attempt {} to fetch withdrawal state for withdrawalId={}, eventId={}",
+ context.getRetryCount() + 1, withdrawalId, eventId);
+
+ WithdrawalState withdrawalState = withdrawalFistfulClient.get(withdrawalId, createEventRange(eventId));
+ if (withdrawalState == null) {
+ log.warn("Withdrawal not found for withdrawalId={}, eventId={} (attempt {})",
+ withdrawalId, eventId, context.getRetryCount() + 1);
+ throw new RuntimeException("Withdrawal not found!");
+ }
+
+ log.debug("Successfully fetched withdrawal state for withdrawalId={} (attempt {})",
+ withdrawalId, context.getRetryCount() + 1);
+ return withdrawalState;
+ });
+ } catch (Exception e) {
+ log.error("Error fetching withdrawal state for withdrawalId={}, eventId={}", withdrawalId, eventId, e);
+ throw new RuntimeException("Failed to fetch withdrawal state", e);
+ }
+ }
+
+ private EventRange createEventRange(long eventId) {
+ return new EventRange().setLimit((int) eventId);
+ }
+}
diff --git a/src/main/java/dev/vality/wallets/hooker/utils/CashFlowUtils.java b/src/main/java/dev/vality/wallets/hooker/utils/CashFlowUtils.java
new file mode 100644
index 0000000..fc539ed
--- /dev/null
+++ b/src/main/java/dev/vality/wallets/hooker/utils/CashFlowUtils.java
@@ -0,0 +1,19 @@
+package dev.vality.wallets.hooker.utils;
+
+import dev.vality.fistful.cashflow.FinalCashFlowPosting;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CashFlowUtils {
+
+ public static long getWithdrawalFee(List postings) {
+ return postings.stream()
+ .filter(posting -> posting.getSource().getAccountType().isSetWallet()
+ && posting.getDestination().getAccountType().isSetSystem())
+ .map(posting -> posting.getVolume().getAmount())
+ .reduce(0L, Long::sum);
+ }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 67a0b85..95ef260 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -90,8 +90,19 @@ parent:
exist:
id: -1
+service:
+ withdrawal:
+ url: http://fistful:8022/v1/
+ networkTimeout: 5000
+ retry:
+ maxAttempts: 10
+ initialIntervalMs: 1000
+ multiplier: 2.0
+ maxIntervalMs: 10000
+
testcontainers:
postgresql:
tag: '11.4'
kafka:
- tag: '6.2.0'
+ confluent:
+ tag: '7.8.0'
diff --git a/src/test/java/dev/vality/wallets/hooker/config/KafkaPostgresqlSpringBootITest.java b/src/test/java/dev/vality/wallets/hooker/config/KafkaPostgresqlSpringBootITest.java
index ccf23b7..42a74eb 100644
--- a/src/test/java/dev/vality/wallets/hooker/config/KafkaPostgresqlSpringBootITest.java
+++ b/src/test/java/dev/vality/wallets/hooker/config/KafkaPostgresqlSpringBootITest.java
@@ -1,7 +1,8 @@
package dev.vality.wallets.hooker.config;
-import dev.vality.testcontainers.annotations.KafkaConfig;
+import dev.vality.testcontainers.annotations.KafkaTestConfig;
import dev.vality.testcontainers.annotations.kafka.KafkaTestcontainerSingleton;
+import dev.vality.testcontainers.annotations.kafka.constants.Provider;
import dev.vality.testcontainers.annotations.postgresql.PostgresqlTestcontainerSingleton;
import org.springframework.boot.test.context.SpringBootTest;
@@ -14,6 +15,7 @@
@Retention(RetentionPolicy.RUNTIME)
@PostgresqlTestcontainerSingleton
@KafkaTestcontainerSingleton(
+ provider = Provider.CONFLUENT,
properties = {
"kafka.topic.wallet.listener.enabled=true",
"kafka.topic.withdrawal.listener.enabled=true",
@@ -24,6 +26,6 @@
"kafka.topic.withdrawal.name",
"kafka.topic.destination.name"})
@SpringBootTest
-@KafkaConfig
+@KafkaTestConfig
public @interface KafkaPostgresqlSpringBootITest {
}
diff --git a/src/test/java/dev/vality/wallets/hooker/dao/webhook/WebHookDaoImplTest.java b/src/test/java/dev/vality/wallets/hooker/dao/webhook/WebHookDaoImplTest.java
index 609a5d3..ab8d744 100644
--- a/src/test/java/dev/vality/wallets/hooker/dao/webhook/WebHookDaoImplTest.java
+++ b/src/test/java/dev/vality/wallets/hooker/dao/webhook/WebHookDaoImplTest.java
@@ -4,8 +4,11 @@
import dev.vality.wallets.hooker.domain.WebHookModel;
import dev.vality.wallets.hooker.domain.enums.EventType;
import dev.vality.wallets.hooker.domain.tables.pojos.Webhook;
+import dev.vality.wallets.hooker.service.WithdrawalClient;
+import dev.vality.fistful.withdrawal.ManagementSrv;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.bean.override.mockito.MockitoBean;
import java.util.LinkedHashSet;
import java.util.List;
@@ -21,6 +24,12 @@ public class WebHookDaoImplTest {
@Autowired
private WebHookDao webHookDao;
+ @MockitoBean
+ private WithdrawalClient withdrawalClient;
+
+ @MockitoBean
+ private ManagementSrv.Iface withdrawalFistfulClient;
+
@Test
void create() {
LinkedHashSet eventTypes = new LinkedHashSet<>();
diff --git a/src/test/java/dev/vality/wallets/hooker/handler/DestinationEventHandlerTest.java b/src/test/java/dev/vality/wallets/hooker/handler/DestinationEventHandlerTest.java
index 1231ede..591f0c3 100644
--- a/src/test/java/dev/vality/wallets/hooker/handler/DestinationEventHandlerTest.java
+++ b/src/test/java/dev/vality/wallets/hooker/handler/DestinationEventHandlerTest.java
@@ -5,8 +5,9 @@
import dev.vality.wallets.hooker.domain.WebHookModel;
import dev.vality.wallets.hooker.domain.enums.EventType;
import dev.vality.wallets.hooker.service.WebHookMessageSenderService;
+import dev.vality.wallets.hooker.service.WithdrawalClient;
+import dev.vality.fistful.withdrawal.ManagementSrv;
import dev.vality.wallets.hooker.service.kafka.DestinationEventService;
-import dev.vality.wallets.hooker.service.kafka.WithdrawalEventService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,6 +36,12 @@ class DestinationEventHandlerTest {
@MockitoBean
private WebHookMessageSenderService webHookMessageSenderService;
+ @MockitoBean
+ private WithdrawalClient withdrawalClient;
+
+ @MockitoBean
+ private ManagementSrv.Iface withdrawalFistfulClient;
+
@Test
void failHandleDestinationCreated() {
WebHookModel webhook = TestBeanFactory.createWebhookModel();
@@ -61,4 +68,4 @@ void handleDestinationCreatedAndAccountChange() {
verify(webHookMessageSenderService, times(1))
.send(any());
}
-}
\ No newline at end of file
+}
diff --git a/src/test/java/dev/vality/wallets/hooker/handler/WithdrawalEventHandlerTest.java b/src/test/java/dev/vality/wallets/hooker/handler/WithdrawalEventHandlerTest.java
index c580946..32c0254 100644
--- a/src/test/java/dev/vality/wallets/hooker/handler/WithdrawalEventHandlerTest.java
+++ b/src/test/java/dev/vality/wallets/hooker/handler/WithdrawalEventHandlerTest.java
@@ -5,6 +5,8 @@
import dev.vality.wallets.hooker.dao.webhook.WebHookDao;
import dev.vality.wallets.hooker.domain.WebHookModel;
import dev.vality.wallets.hooker.service.WebHookMessageSenderService;
+import dev.vality.wallets.hooker.service.WithdrawalClient;
+import dev.vality.fistful.withdrawal.ManagementSrv;
import dev.vality.wallets.hooker.service.kafka.WithdrawalEventService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -29,6 +31,12 @@ class WithdrawalEventHandlerTest {
@MockitoBean
private WebHookMessageSenderService webHookMessageSenderService;
+ @MockitoBean
+ private WithdrawalClient withdrawalClient;
+
+ @MockitoBean
+ private ManagementSrv.Iface withdrawalFistfulClient;
+
@Test
void handleWithdrawalCreatedAndAndStatusChange() throws InterruptedException {
WebHookModel webhook = TestBeanFactory.createWebhookModel();
diff --git a/src/test/java/dev/vality/wallets/hooker/kafka/KafkaProducer.java b/src/test/java/dev/vality/wallets/hooker/kafka/KafkaProducer.java
index 5e4190d..ad2fbe6 100644
--- a/src/test/java/dev/vality/wallets/hooker/kafka/KafkaProducer.java
+++ b/src/test/java/dev/vality/wallets/hooker/kafka/KafkaProducer.java
@@ -2,7 +2,7 @@
import dev.vality.machinegun.eventsink.MachineEvent;
import dev.vality.machinegun.eventsink.SinkEvent;
-import dev.vality.testcontainers.annotations.kafka.config.KafkaProducerConfig;
+import dev.vality.testcontainers.annotations.kafka.config.KafkaProducerTestConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TBase;
import org.springframework.beans.factory.annotation.Autowired;
@@ -13,7 +13,7 @@
import java.time.format.DateTimeFormatter;
@TestComponent
-@Import(KafkaProducerConfig.class)
+@Import(KafkaProducerTestConfig.class)
@Slf4j
public class KafkaProducer {
diff --git a/src/test/java/dev/vality/wallets/hooker/kafka/WebhookServiceTest.java b/src/test/java/dev/vality/wallets/hooker/kafka/WebhookServiceTest.java
index 5704e66..86ab081 100644
--- a/src/test/java/dev/vality/wallets/hooker/kafka/WebhookServiceTest.java
+++ b/src/test/java/dev/vality/wallets/hooker/kafka/WebhookServiceTest.java
@@ -5,6 +5,8 @@
import dev.vality.wallets.hooker.config.KafkaPostgresqlSpringBootITest;
import dev.vality.wallets.hooker.handler.TestBeanFactory;
import dev.vality.wallets.hooker.service.WebHookMessageSenderService;
+import dev.vality.wallets.hooker.service.WithdrawalClient;
+import dev.vality.fistful.withdrawal.ManagementSrv;
import dev.vality.wallets.hooker.service.kafka.DestinationEventService;
import dev.vality.wallets.hooker.service.kafka.WithdrawalEventService;
import dev.vality.webhook.dispatcher.WebhookMessage;
@@ -15,6 +17,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.bean.override.mockito.MockitoBean;
import java.time.Duration;
import java.util.ArrayList;
@@ -29,6 +32,12 @@
@TestPropertySource(properties = "merchant.callback.timeout=1")
class WebhookServiceTest {
+ @MockitoBean
+ private WithdrawalClient withdrawalClient;
+
+ @MockitoBean
+ private ManagementSrv.Iface withdrawalFistfulClient;
+
private static final String TEST = "/test";
private static final String URL_2 = TEST + "/qwe";