Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 58 additions & 83 deletions .idea/workspace.xml

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions antifraud-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-kafka</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
Expand All @@ -68,10 +71,6 @@
<artifactId>transaction-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.devpull.antifraudservice.config;

import com.fasterxml.jackson.databind.ser.std.StringSerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
Expand All @@ -16,16 +17,19 @@
public class KafkaProducerConfig {

@Bean
public ProducerFactory<String, byte[]> producerFactory() {
public ProducerFactory<String, byte[]> producerFactory(
@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers
) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put("spring.json.add.type.headers", false);
return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
public KafkaTemplate<String, byte[]> kafkaTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package com.devpull.antifraudservice.controller;

import com.devpull.antifraudservice.dto.TransactionCreatedEvent;
import com.devpull.antifraudservice.dto.TransactionStatusChangedEvent;
import com.devpull.antifraudservice.kafka.EventProducer;
import com.devpull.antifraudservice.service.AntifraudEvaluationService;
import com.devpull.transactionservice.domain.enums.TransactionStatus;
import com.devpull.transactionservice.domain.model.Transaction;
import jakarta.validation.Valid;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
Expand All @@ -11,20 +16,32 @@

@RestController
@RequestMapping("/api/v1/antifraud")
@Slf4j
public class AntifraudController {

private final EventProducer eventProducer;

public AntifraudController(EventProducer eventProducer){
private final AntifraudEvaluationService evaluationService;


public AntifraudController(EventProducer eventProducer,
AntifraudEvaluationService evaluationService) {
this.eventProducer = eventProducer;
this.evaluationService = evaluationService;
}

@PostMapping
public String evaluateAntifraud(@RequestBody Transaction transaction) throws Exception {
String msj = transaction.getAmount() > 1000 ? "REJECT" : "APPROVED";
transaction.setStatus(TransactionStatus.valueOf(msj));
eventProducer.sendMessage(transaction);
public ResponseEntity<String> evaluateAntifraud(@Valid @RequestBody TransactionCreatedEvent transaction) {
log.info("[ANTIFRAUD] Evaluating transactionId={}, amount={}",
transaction.transactionId(), transaction.amount());

TransactionStatusChangedEvent resultEvent = evaluationService.evaluate(transaction);

eventProducer.sendMessage(resultEvent);

log.info("[ANTIFRAUD] Evaluation done transactionId={}, status={}",
resultEvent.transactionId(), resultEvent.status());

return "Event handled successfully!";
return ResponseEntity.ok("Event handled successfully!");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.devpull.antifraudservice.dto;

import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Positive;

import java.util.UUID;

public record TransactionCreatedEvent(
@NotNull UUID transactionId,
@NotNull UUID accountId,
@NotNull String type,
@NotNull @Positive Double amount
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.devpull.antifraudservice.dto;

import com.devpull.antifraudservice.enums.TransactionStatus;
import jakarta.validation.constraints.NotNull;

import java.time.Instant;
import java.util.UUID;

public record TransactionStatusChangedEvent(
@NotNull UUID transactionId,
@NotNull TransactionStatus status,
Instant evaluatedAt,
String reason
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.devpull.antifraudservice.enums;

public enum TransactionStatus {
APPROVED,
REJECTED
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package com.devpull.antifraudservice.kafka;

import com.devpull.transactionservice.domain.model.Transaction;
import com.devpull.antifraudservice.dto.TransactionStatusChangedEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import tools.jackson.databind.ObjectMapper;

Expand All @@ -16,24 +13,24 @@ public class EventProducer {

private final KafkaTemplate<String, byte[]> kafkaTemplate;
private final ObjectMapper objectMapper;
private final NewTopic topic;
private final String topicName;

public EventProducer(
KafkaTemplate<String, byte[]> kafkaTemplate,
ObjectMapper objectMapper,
NewTopic topic
) {
public EventProducer(KafkaTemplate<String, byte[]> kafkaTemplate,
NewTopic topic,
ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
this.topic = topic;
this.topicName = topic.name();
}

public void sendMessage(Transaction transaction) {
public void sendMessage(TransactionStatusChangedEvent event) {
try {
byte[] payload = objectMapper.writeValueAsBytes(transaction);
kafkaTemplate.send(topic.name(), payload);
byte[] payload = objectMapper.writeValueAsBytes(event);
kafkaTemplate.send(topicName, event.transactionId().toString(), payload);
log.info("[ANTIFRAUD] Sent status event txId={}, status={}", event.transactionId(), event.status());
} catch (Exception e) {
log.error("Error serializing transaction event", e);
log.error("[ANTIFRAUD] Failed to serialize/send event", e);
throw new RuntimeException("Failed to produce event", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.devpull.antifraudservice.service;

import com.devpull.antifraudservice.dto.TransactionCreatedEvent;
import com.devpull.antifraudservice.dto.TransactionStatusChangedEvent;
import com.devpull.antifraudservice.enums.TransactionStatus;
import org.springframework.stereotype.Service;

import java.time.Instant;

@Service
public class AntifraudEvaluationService {

private static final double THRESHOLD = 1000.0;

public TransactionStatusChangedEvent evaluate(TransactionCreatedEvent tx) {
boolean rejected = tx.amount() != null && tx.amount() > THRESHOLD;

TransactionStatus status = rejected ? TransactionStatus.REJECTED : TransactionStatus.APPROVED;

String reason = rejected
? "Amount exceeds threshold (" + THRESHOLD + ")"
: "OK";

return new TransactionStatusChangedEvent(
tx.transactionId(),
status,
Instant.now(),
reason
);
}
}
14 changes: 7 additions & 7 deletions antifraud-service/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ spring:
group-id: ${KAFKA_CONSUMER_GROUP:antifraud-service}
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: "*"
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

topic:
name: ${KAFKA_TOPIC_TRANSACTIONS:transactions_fraud}

logging:
level:
io.r2dbc.postgresql.QUERY: ${LOG_R2DBC_QUERY:DEBUG}
io.r2dbc.postgresql.PARAM: ${LOG_R2DBC_PARAM:DEBUG}
5 changes: 5 additions & 0 deletions transaction-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
<version>4.1.0-M1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns-native-macos</artifactId>
<classifier>osx-aarch_64</classifier>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.devpull.transactionservice;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TransactionServiceApplication {

public static void main(String[] args) {
SpringApplication.run(TransactionServiceApplication.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.devpull.transactionservice.adapter.in.kafka;

import com.devpull.transactionservice.adapter.in.web.dto.kafka.TransactionStatusChangedEvent;
import com.devpull.transactionservice.application.port.in.UpdateTransactionStatusUseCase;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tools.jackson.databind.ObjectMapper;

@Component
@RequiredArgsConstructor
@Slf4j
public class AntifraudResultListener {

private final ObjectMapper objectMapper;
private final UpdateTransactionStatusUseCase useCase;

@KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}")
public void consume(byte[] payload) {
try{
TransactionStatusChangedEvent event =
objectMapper.readValue(payload, TransactionStatusChangedEvent.class);

log.info("Received antifraud event txId={}, status={}",
event.transactionId(), event.status());

useCase.updateStatus(event.transactionId(), event.status())
.doOnError(e -> log.error("Failed updating tx {}", event.transactionId(), e))
.subscribe();

} catch (Exception e) {
log.error("Failed to deserialize antifraud payload", e);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.devpull.transactionservice.adapter.in.kafka;

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;

@Configuration
@EnableKafka
public class KafkaEnableConfig {
}

This file was deleted.

Loading