diff --git a/admin/admin-app/pom.xml b/admin/admin-app/pom.xml index 3b6652038..e81d14efa 100644 --- a/admin/admin-app/pom.xml +++ b/admin/admin-app/pom.xml @@ -65,6 +65,10 @@ co.nilin.opex.admin admin-service-auth + + co.nilin.opex.admin + admin-submitter-kafka + co.nilin.opex.utility.error error-handler diff --git a/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/config/SecurityConfig.kt b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/config/SecurityConfig.kt index 3f7785757..ec7fe45dc 100644 --- a/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/config/SecurityConfig.kt +++ b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/config/SecurityConfig.kt @@ -1,6 +1,6 @@ package co.nilin.opex.admin.app.config -import co.nilin.opex.admin.app.utils.hasRealmRole +import co.nilin.opex.admin.app.utils.hasRole import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity @@ -20,7 +20,8 @@ class SecurityConfig(private val webClient: WebClient) { fun springSecurityFilterChain(http: ServerHttpSecurity): SecurityWebFilterChain? { http.csrf().disable() .authorizeExchange() - .pathMatchers("/auth/**").hasRealmRole("SCOPE_trust", "finance-admin") + .pathMatchers("/auth/**").hasRole("SCOPE_trust", "finance-admin") + .pathMatchers("/system/**").hasRole("SCOPE_trust", "system-admin") .anyExchange().authenticated() .and() .oauth2ResourceServer() diff --git a/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/controller/SystemConfigController.kt b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/controller/SystemConfigController.kt new file mode 100644 index 000000000..993f14c32 --- /dev/null +++ b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/controller/SystemConfigController.kt @@ -0,0 +1,33 @@ +package co.nilin.opex.admin.app.controller + +import co.nilin.opex.admin.app.data.AddCurrencyRequest +import co.nilin.opex.admin.app.data.EditCurrencyRequest +import co.nilin.opex.admin.app.service.SystemConfigService +import co.nilin.opex.utility.error.data.OpexError +import co.nilin.opex.utility.error.data.OpexException +import org.springframework.web.bind.annotation.* + +@RestController +@RequestMapping("/system/v1") +class SystemConfigController(private val service: SystemConfigService) { + + @PostMapping("/currency") + suspend fun addCurrency(@RequestBody body: AddCurrencyRequest) { + if (!body.isValid()) + throw OpexException(OpexError.BadRequest) + service.addCurrency(body) + } + + @PutMapping("/currency/{name}") + suspend fun editCurrency(@RequestBody body: EditCurrencyRequest, @PathVariable name: String) { + if (!body.isValid()) + throw OpexException(OpexError.BadRequest) + service.editCurrency(name, body) + } + + @DeleteMapping("/currency/{name}") + suspend fun deleteCurrency(@PathVariable name: String) { + service.deleteCurrency(name) + } + +} \ No newline at end of file diff --git a/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/data/AddCurrencyRequest.kt b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/data/AddCurrencyRequest.kt new file mode 100644 index 000000000..06a9fdb22 --- /dev/null +++ b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/data/AddCurrencyRequest.kt @@ -0,0 +1,13 @@ +package co.nilin.opex.admin.app.data + +data class AddCurrencyRequest( + val name: String?, + val symbol: String?, + val precision: Double +) { + + fun isValid(): Boolean { + return !name.isNullOrEmpty() && !symbol.isNullOrEmpty() && precision > 0.0 && precision <= 1 + } + +} \ No newline at end of file diff --git a/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/data/EditCurrencyRequest.kt b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/data/EditCurrencyRequest.kt new file mode 100644 index 000000000..1e00d3a3d --- /dev/null +++ b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/data/EditCurrencyRequest.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.admin.app.data + +data class EditCurrencyRequest( + val symbol: String?, + val precision: Double +){ + fun isValid(): Boolean { + return !symbol.isNullOrEmpty() && precision > 0.0 + } +} \ No newline at end of file diff --git a/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/service/SystemConfigService.kt b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/service/SystemConfigService.kt new file mode 100644 index 000000000..73523e8bb --- /dev/null +++ b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/service/SystemConfigService.kt @@ -0,0 +1,28 @@ +package co.nilin.opex.admin.app.service + +import co.nilin.opex.admin.app.data.AddCurrencyRequest +import co.nilin.opex.admin.app.data.EditCurrencyRequest +import co.nilin.opex.admin.core.events.AddCurrencyEvent +import co.nilin.opex.admin.core.events.DeleteCurrencyEvent +import co.nilin.opex.admin.core.events.EditCurrencyEvent +import co.nilin.opex.admin.core.spi.AdminEventPublisher +import org.springframework.stereotype.Service + +@Service +class SystemConfigService(private val publisher: AdminEventPublisher) { + + suspend fun addCurrency(body: AddCurrencyRequest) { + with(body) { + publisher.publish(AddCurrencyEvent(name!!, symbol!!, precision)) + } + } + + suspend fun editCurrency(name: String, body: EditCurrencyRequest) { + publisher.publish(EditCurrencyEvent(name, body.symbol!!, body.precision)) + } + + suspend fun deleteCurrency(name: String) { + publisher.publish(DeleteCurrencyEvent(name)) + } + +} \ No newline at end of file diff --git a/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/utils/Extensions.kt b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/utils/Extensions.kt index 824c3f3c8..569ba55d5 100644 --- a/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/utils/Extensions.kt +++ b/admin/admin-app/src/main/kotlin/co/nilin/opex/admin/app/utils/Extensions.kt @@ -1,20 +1,17 @@ package co.nilin.opex.admin.app.utils import com.nimbusds.jose.shaded.json.JSONArray -import com.nimbusds.jose.shaded.json.JSONObject import org.springframework.security.authorization.AuthorizationDecision import org.springframework.security.config.web.server.ServerHttpSecurity import org.springframework.security.oauth2.jwt.Jwt -fun ServerHttpSecurity.AuthorizeExchangeSpec.Access.hasRealmRole( +fun ServerHttpSecurity.AuthorizeExchangeSpec.Access.hasRole( authority: String, role: String ): ServerHttpSecurity.AuthorizeExchangeSpec = access { mono, _ -> mono.map { auth -> - auth.authorities.any { it.authority == authority } - && (((auth.principal as Jwt).claims["realm_access"] as JSONObject?)?.get("roles") as JSONArray?) - ?.contains(role) == true - }.map { granted -> - AuthorizationDecision(granted) + val hasAuthority = auth.authorities.any { it.authority == authority } + val hasRole = ((auth.principal as Jwt).claims["roles"] as JSONArray?)?.contains(role) == true + AuthorizationDecision(hasAuthority && hasRole) } } \ No newline at end of file diff --git a/admin/admin-app/src/main/resources/application.yml b/admin/admin-app/src/main/resources/application.yml index 1218ec694..2696381ca 100644 --- a/admin/admin-app/src/main/resources/application.yml +++ b/admin/admin-app/src/main/resources/application.yml @@ -5,6 +5,10 @@ spring: name: opex-admin main: allow-bean-definition-overriding: true + kafka: + bootstrap-servers: ${KAFKA_IP_PORT:localhost:9092} + consumer: + group-id: admin cloud: bootstrap: enabled: true @@ -34,6 +38,7 @@ spring: app: auth: cert-url: lb://opex-auth/auth/realms/opex/protocol/openid-connect/certs + token-url: lb://opex-auth/auth/realms/opex/protocol/openid-connect/token keycloak: url: http://auth:8080/auth realm: opex diff --git a/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/AddCurrencyEvent.kt b/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/AddCurrencyEvent.kt new file mode 100644 index 000000000..95bfb79b8 --- /dev/null +++ b/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/AddCurrencyEvent.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.admin.core.events + +data class AddCurrencyEvent( + val name: String, + val symbol: String, + val precision: Double +) : AdminEvent() \ No newline at end of file diff --git a/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/AdminEvent.kt b/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/AdminEvent.kt new file mode 100644 index 000000000..5aaee8941 --- /dev/null +++ b/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/AdminEvent.kt @@ -0,0 +1,4 @@ +package co.nilin.opex.admin.core.events + +abstract class AdminEvent { +} \ No newline at end of file diff --git a/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/DeleteCurrencyEvent.kt b/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/DeleteCurrencyEvent.kt new file mode 100644 index 000000000..a55cb69f3 --- /dev/null +++ b/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/DeleteCurrencyEvent.kt @@ -0,0 +1,3 @@ +package co.nilin.opex.admin.core.events + +data class DeleteCurrencyEvent(val name: String) : AdminEvent() \ No newline at end of file diff --git a/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/EditCurrencyEvent.kt b/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/EditCurrencyEvent.kt new file mode 100644 index 000000000..4697880e8 --- /dev/null +++ b/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/events/EditCurrencyEvent.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.admin.core.events + +data class EditCurrencyEvent( + val name: String, + val symbol: String, + val precision: Double +) : AdminEvent() \ No newline at end of file diff --git a/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/spi/AdminEventPublisher.kt b/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/spi/AdminEventPublisher.kt new file mode 100644 index 000000000..0480f845c --- /dev/null +++ b/admin/admin-core/src/main/kotlin/co/nilin/opex/admin/core/spi/AdminEventPublisher.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.admin.core.spi + +import co.nilin.opex.admin.core.events.AdminEvent + +interface AdminEventPublisher { + + suspend fun publish(event: AdminEvent) + +} \ No newline at end of file diff --git a/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/controller/AuthAdminController.kt b/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/controller/AuthAdminController.kt index 0cd8e399b..334340d86 100644 --- a/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/controller/AuthAdminController.kt +++ b/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/controller/AuthAdminController.kt @@ -1,9 +1,11 @@ package co.nilin.opex.admin.ports.auth.controller +import co.nilin.opex.admin.ports.auth.data.ImpersonateRequest import co.nilin.opex.admin.ports.auth.data.KeycloakUser import co.nilin.opex.admin.ports.auth.data.KycGroup import co.nilin.opex.admin.ports.auth.service.AuthAdminService import co.nilin.opex.admin.ports.auth.utils.asKeycloakUser +import org.springframework.http.MediaType import org.springframework.web.bind.annotation.* @RestController @@ -35,4 +37,9 @@ class AuthAdminController(private val service: AuthAdminService) { return service.findUsersInGroupByName(groupName).map { it.asKeycloakUser() } } + @PostMapping("/user/impersonate", produces = [MediaType.APPLICATION_JSON_VALUE]) + suspend fun impersonate(@RequestBody body: ImpersonateRequest): String { + return service.impersonate(body.clientId, body.clientSecret, body.userId) + } + } \ No newline at end of file diff --git a/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/data/ImpersonateRequest.kt b/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/data/ImpersonateRequest.kt new file mode 100644 index 000000000..079a55fe4 --- /dev/null +++ b/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/data/ImpersonateRequest.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.admin.ports.auth.data + +data class ImpersonateRequest( + val clientId: String, + val clientSecret: String, + val userId: String +) \ No newline at end of file diff --git a/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/proxy/KeycloakProxy.kt b/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/proxy/KeycloakProxy.kt new file mode 100644 index 000000000..caaf0f29f --- /dev/null +++ b/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/proxy/KeycloakProxy.kt @@ -0,0 +1,44 @@ +package co.nilin.opex.admin.ports.auth.proxy + +import kotlinx.coroutines.reactor.awaitSingle +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Value +import org.springframework.http.MediaType +import org.springframework.stereotype.Component +import org.springframework.web.reactive.function.BodyInserters +import org.springframework.web.reactive.function.client.WebClient +import org.springframework.web.reactive.function.client.bodyToMono + +@Component +class KeycloakProxy(private val webClient: WebClient) { + + private val logger = LoggerFactory.getLogger(KeycloakProxy::class.java) + + @Value("\${app.auth.token-url}") + private lateinit var tokenUrl: String + + suspend fun impersonate( + token: String, + clientId: String, + clientSecret: String, + userId: String + ): String { + val body = BodyInserters.fromFormData("client_id", clientId) + .with("client_secret", clientSecret) + .with("requested_subject", userId) + .with("subject_token", token) + .with("grant_type", "urn:ietf:params:oauth:grant-type:token-exchange") + + logger.info("Request token exchange for user $userId and client $clientId") + return webClient.post() + .uri(tokenUrl) + .accept(MediaType.APPLICATION_JSON) + .header("Content-Type", "application/x-www-form-urlencoded") + .body(body) + .retrieve() + .onStatus({ t -> t.isError }, { it.createException() }) + .bodyToMono() + .awaitSingle() + } + +} \ No newline at end of file diff --git a/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/service/AuthAdminService.kt b/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/service/AuthAdminService.kt index 26411d709..e5e623ff7 100644 --- a/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/service/AuthAdminService.kt +++ b/admin/admin-ports/admin-service-auth/src/main/kotlin/co/nilin/opex/admin/ports/auth/service/AuthAdminService.kt @@ -1,6 +1,7 @@ package co.nilin.opex.admin.ports.auth.service import co.nilin.opex.admin.ports.auth.data.KycGroup +import co.nilin.opex.admin.ports.auth.proxy.KeycloakProxy import co.nilin.opex.utility.error.data.OpexError import co.nilin.opex.utility.error.data.OpexException import org.keycloak.admin.client.Keycloak @@ -10,7 +11,11 @@ import org.keycloak.representations.idm.UserRepresentation import org.springframework.stereotype.Service @Service -class AuthAdminService(private val keycloak: Keycloak, private val opexRealm: RealmResource) { +class AuthAdminService( + private val keycloak: Keycloak, + private val opexRealm: RealmResource, + private val proxy: KeycloakProxy +) { fun findAllUsers(): List { return opexRealm.users().list() @@ -58,4 +63,10 @@ class AuthAdminService(private val keycloak: Keycloak, private val opexRealm: Re } } + suspend fun impersonate(clientId: String, clientSecret: String, userId: String): String { + opexRealm.users().get(userId) ?: throw OpexException(OpexError.NotFound, "User not found") + val token = keycloak.tokenManager().accessToken.token + return proxy.impersonate(token, clientId, clientSecret, userId) + } + } \ No newline at end of file diff --git a/admin/admin-ports/admin-submitter-kafka/.gitignore b/admin/admin-ports/admin-submitter-kafka/.gitignore new file mode 100644 index 000000000..549e00a2a --- /dev/null +++ b/admin/admin-ports/admin-submitter-kafka/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/admin/admin-ports/admin-submitter-kafka/pom.xml b/admin/admin-ports/admin-submitter-kafka/pom.xml new file mode 100644 index 000000000..2f0b78b2b --- /dev/null +++ b/admin/admin-ports/admin-submitter-kafka/pom.xml @@ -0,0 +1,57 @@ + + + 4.0.0 + + + co.nilin.opex.admin + admin + 1.0-SNAPSHOT + ../../pom.xml + + + admin-submitter-kafka + admin-submitter-kafka + Kafka message submitter + + + + org.jetbrains.kotlin + kotlin-reflect + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-webflux + + + co.nilin.opex.admin + admin-core + + + org.springframework.kafka + spring-kafka + + + io.projectreactor.kotlin + reactor-kotlin-extensions + + + org.jetbrains.kotlinx + kotlinx-coroutines-reactor + + + org.jetbrains.kotlinx + kotlinx-coroutines-core + + + org.springframework.kafka + spring-kafka-test + test + + + + diff --git a/admin/admin-ports/admin-submitter-kafka/src/main/kotlin/co/nilin/opex/admin/ports/kafka/submitter/config/KafkaConfig.kt b/admin/admin-ports/admin-submitter-kafka/src/main/kotlin/co/nilin/opex/admin/ports/kafka/submitter/config/KafkaConfig.kt new file mode 100644 index 000000000..99a9894f2 --- /dev/null +++ b/admin/admin-ports/admin-submitter-kafka/src/main/kotlin/co/nilin/opex/admin/ports/kafka/submitter/config/KafkaConfig.kt @@ -0,0 +1,56 @@ +package co.nilin.opex.admin.ports.kafka.submitter.config + +import co.nilin.opex.admin.core.events.AdminEvent +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.support.GenericApplicationContext +import org.springframework.kafka.config.TopicBuilder +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.support.serializer.JsonSerializer +import java.util.function.Supplier + +@Configuration +class KafkaConfig { + + @Value("\${spring.kafka.bootstrap-servers}") + private lateinit var bootstrapServers: String + + @Bean + fun producerConfigs(): Map { + return mapOf( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java, + ProducerConfig.ACKS_CONFIG to "all", + JsonSerializer.TYPE_MAPPINGS to "admin_add_currency:co.nilin.opex.admin.core.events.AddCurrencyEvent,admin_edit_currency:co.nilin.opex.admin.core.events.EditCurrencyEvent,admin_delete_currency:co.nilin.opex.admin.core.events.DeleteCurrencyEvent" + ) + } + + @Bean + fun producerFactory(config: Map): ProducerFactory { + return DefaultKafkaProducerFactory(config) + } + + @Bean + fun kafkaTemplate(factory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(factory) + } + + @Autowired + fun createTopic(applicationContext: GenericApplicationContext) { + applicationContext.registerBean("topic_admin_event", NewTopic::class.java, Supplier { + TopicBuilder.name("admin_event") + .partitions(1) + .replicas(1) + .build() + }) + } + +} \ No newline at end of file diff --git a/admin/admin-ports/admin-submitter-kafka/src/main/kotlin/co/nilin/opex/admin/ports/kafka/submitter/service/AdminKafkaEventPublisher.kt b/admin/admin-ports/admin-submitter-kafka/src/main/kotlin/co/nilin/opex/admin/ports/kafka/submitter/service/AdminKafkaEventPublisher.kt new file mode 100644 index 000000000..d27a0c57a --- /dev/null +++ b/admin/admin-ports/admin-submitter-kafka/src/main/kotlin/co/nilin/opex/admin/ports/kafka/submitter/service/AdminKafkaEventPublisher.kt @@ -0,0 +1,23 @@ +package co.nilin.opex.admin.ports.kafka.submitter.service + +import co.nilin.opex.admin.core.events.AdminEvent +import co.nilin.opex.admin.core.spi.AdminEventPublisher +import org.slf4j.LoggerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.stereotype.Component +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +@Component +class AdminKafkaEventPublisher(private val kafkaTemplate: KafkaTemplate) : AdminEventPublisher { + + private val logger = LoggerFactory.getLogger(AdminKafkaEventPublisher::class.java) + + override suspend fun publish(event: AdminEvent): Unit = suspendCoroutine { cont -> + logger.info("Publishing admin event: $event") + val sendFuture = kafkaTemplate.send("admin_event", event) + sendFuture.addCallback({ cont.resume(Unit) }, { cont.resumeWithException(it) }) + } + +} \ No newline at end of file diff --git a/admin/pom.xml b/admin/pom.xml index 2eeb757b8..3a6c9ae9e 100644 --- a/admin/pom.xml +++ b/admin/pom.xml @@ -19,6 +19,7 @@ admin-app admin-core admin-ports/admin-service-auth + admin-ports/admin-submitter-kafka @@ -30,6 +31,11 @@ + + co.nilin.opex.admin + admin-core + ${project.version} + co.nilin.opex.admin admin-service-auth @@ -37,7 +43,7 @@ co.nilin.opex.admin - admin-core + admin-submitter-kafka ${project.version} diff --git a/bc-gateway/bc-gateway-app/pom.xml b/bc-gateway/bc-gateway-app/pom.xml index 33b85f648..9b213843c 100644 --- a/bc-gateway/bc-gateway-app/pom.xml +++ b/bc-gateway/bc-gateway-app/pom.xml @@ -27,7 +27,6 @@ com.fasterxml.jackson.module jackson-module-kotlin - org.jetbrains.kotlin kotlin-stdlib @@ -82,6 +81,10 @@ co.nilin.opex.bcgateway.ports.walletproxy bc-gateway-wallet-proxy + + co.nilin.opex.bcgateway.ports.kafka.listener + bc-gateway-eventlistener-kafka + io.springfox springfox-boot-starter diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt index 94033e961..d6d6c73a1 100644 --- a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/AppConfig.kt @@ -10,6 +10,9 @@ import co.nilin.opex.bcgateway.core.service.ChainSyncServiceImpl import co.nilin.opex.bcgateway.core.service.InfoServiceImpl import co.nilin.opex.bcgateway.core.service.WalletSyncServiceImpl import co.nilin.opex.bcgateway.core.spi.* +import co.nilin.opex.bcgateway.ports.kafka.listener.consumer.AdminEventKafkaListener +import co.nilin.opex.bcgateway.ports.kafka.listener.spi.AdminEventListener +import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.transaction.reactive.TransactionalOperator @@ -19,30 +22,30 @@ class AppConfig { @Bean fun assignAddressService( - currencyLoader: CurrencyLoader, + currencyHandler: CurrencyHandler, assignedAddressHandler: AssignedAddressHandler, reservedAddressHandler: ReservedAddressHandler ): AssignAddressService { - return AssignAddressServiceImpl(currencyLoader, assignedAddressHandler, reservedAddressHandler) + return AssignAddressServiceImpl(currencyHandler, assignedAddressHandler, reservedAddressHandler) } @Bean fun chainSyncService( chainSyncSchedulerHandler: ChainSyncSchedulerHandler, - chainEndpointProxyFinder: ChainEndpointProxyFinder, + chainEndpointHandler: ChainEndpointHandler, chainSyncRecordHandler: ChainSyncRecordHandler, walletSyncRecordHandler: WalletSyncRecordHandler, chainSyncRetryHandler: ChainSyncRetryHandler, - currencyLoader: CurrencyLoader, + currencyHandler: CurrencyHandler, operator: TransactionalOperator ): ChainSyncService { return ChainSyncServiceImpl( chainSyncSchedulerHandler, - chainEndpointProxyFinder, + chainEndpointHandler, chainSyncRecordHandler, walletSyncRecordHandler, chainSyncRetryHandler, - currencyLoader, + currencyHandler, operator, AppDispatchers.chainSyncExecutor ) @@ -54,14 +57,14 @@ class AppConfig { walletProxy: WalletProxy, walletSyncRecordHandler: WalletSyncRecordHandler, assignedAddressHandler: AssignedAddressHandler, - currencyLoader: CurrencyLoader + currencyHandler: CurrencyHandler ): WalletSyncService { return WalletSyncServiceImpl( syncSchedulerHandler, walletProxy, walletSyncRecordHandler, assignedAddressHandler, - currencyLoader, + currencyHandler, AppDispatchers.walletSyncExecutor ) } @@ -70,4 +73,12 @@ class AppConfig { fun infoService(): InfoService { return InfoServiceImpl() } + + @Autowired + fun configureEventListeners( + adminKafkaEventListener: AdminEventKafkaListener, + adminEventListener: AdminEventListener, + ) { + adminKafkaEventListener.addEventListener(adminEventListener) + } } diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/SecurityConfig.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/SecurityConfig.kt index 02bd1b3db..ccd180c93 100644 --- a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/SecurityConfig.kt +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/config/SecurityConfig.kt @@ -1,5 +1,6 @@ package co.nilin.opex.bcgateway.app.config +import co.nilin.opex.bcgateway.app.utils.hasRole import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean @@ -21,7 +22,9 @@ class SecurityConfig(@Qualifier("loadBalanced") private val webClient: WebClient http.csrf().disable() .authorizeExchange() .pathMatchers("/filter/**").hasAuthority("SCOPE_trust") - .pathMatchers("/**").permitAll() + .pathMatchers("/admin/**").hasRole("SCOPE_trust", "system-admin") + .pathMatchers("/address/**").permitAll() + .pathMatchers("/deposit/**").permitAll() .anyExchange().authenticated() .and() .oauth2ResourceServer() diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/AdminController.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/AdminController.kt new file mode 100644 index 000000000..bcafbff4b --- /dev/null +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/AdminController.kt @@ -0,0 +1,110 @@ +package co.nilin.opex.bcgateway.app.controller + +import co.nilin.opex.bcgateway.app.dto.* +import co.nilin.opex.bcgateway.app.service.AdminService +import co.nilin.opex.bcgateway.core.model.AddressType +import co.nilin.opex.bcgateway.core.spi.AddressTypeHandler +import co.nilin.opex.bcgateway.core.spi.ChainEndpointHandler +import co.nilin.opex.bcgateway.core.spi.ChainLoader +import co.nilin.opex.bcgateway.core.spi.CurrencyHandler +import co.nilin.opex.utility.error.data.OpexError +import co.nilin.opex.utility.error.data.OpexException +import org.springframework.web.bind.annotation.* + +@RestController +@RequestMapping("/admin") +class AdminController( + private val service: AdminService, + private val chainLoader: ChainLoader, + private val currencyHandler: CurrencyHandler, + private val addressTypeHandler: AddressTypeHandler, + private val chainEndpointHandler: ChainEndpointHandler +) { + + @GetMapping("/chain") + suspend fun getChains(): List { + return chainLoader.fetchAllChains() + .map { c -> ChainResponse(c.name, c.addressTypes.map { it.type }, c.endpoints.map { it.url }) } + } + + @PostMapping("/chain") + suspend fun addChain(@RequestBody body: AddChainRequest) { + if (!body.isValid()) + throw OpexException(OpexError.InvalidRequestBody) + service.addChain(body) + } + + @PostMapping("/chain/{chain}/endpoint") + suspend fun addChainEndpoint(@PathVariable chain: String, @RequestBody body: ChainEndpointRequest) { + chainEndpointHandler.addEndpoint(chain, body.url, body.username, body.password) + } + + @DeleteMapping("/chain/{chain}/endpoint") + suspend fun deleteChainEndpoint(@PathVariable chain: String, @RequestParam url: String) { + chainEndpointHandler.deleteEndpoint(chain, url) + } + + @GetMapping("/address/type") + suspend fun getAddressTypes(): List { + return addressTypeHandler.fetchAll() + } + + @PostMapping("/address/type") + suspend fun addAddressType(@RequestBody body: AddressTypeRequest) { + if (body.name.isNullOrEmpty() || body.addressRegex.isNullOrEmpty()) + throw OpexException(OpexError.InvalidRequestBody) + service.addAddressType(body.name, body.addressRegex, body.memoRegex) + } + + @GetMapping("/token") + suspend fun getCurrencyImplementation(): List { + return currencyHandler.fetchAllImplementations() + .map { + TokenResponse( + it.currency.symbol, + it.chain.name, + it.token, + it.tokenAddress, + it.tokenName, + it.withdrawEnabled, + it.withdrawFee, + it.withdrawMin, + it.decimal + ) + } + } + + @PostMapping("/token") + suspend fun addCurrencyImplementation(@RequestBody body: TokenRequest): TokenResponse { + val ex = OpexException(OpexError.InvalidRequestBody) + with(body) { + if (symbol.isNullOrEmpty() || chain.isNullOrEmpty()) throw ex + if (isToken && (tokenName.isNullOrEmpty() || tokenAddress.isNullOrEmpty())) throw ex + if (withdrawFee < 0 || minimumWithdraw < 0 || decimal < 0) throw ex + } + + return with(service.addToken(body)) { + TokenResponse( + currency.symbol, + chain.name, + token, + tokenAddress, + tokenName, + withdrawEnabled, + withdrawFee, + withdrawMin, + decimal + ) + } + } + + @PutMapping("/token/{symbol}_{chain}/withdraw") + suspend fun changeWithdrawStatus( + @PathVariable symbol: String, + @PathVariable chain: String, + @RequestParam("enabled") status: Boolean + ) { + service.changeTokenWithdrawStatus(symbol, chain, status) + } + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/NetworkController.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/NetworkController.kt index d101e80f8..93d3e6964 100644 --- a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/NetworkController.kt +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/controller/NetworkController.kt @@ -1,16 +1,16 @@ package co.nilin.opex.bcgateway.app.controller import co.nilin.opex.bcgateway.core.model.CurrencyInfo -import co.nilin.opex.bcgateway.core.spi.CurrencyLoader +import co.nilin.opex.bcgateway.core.spi.CurrencyHandler import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.PathVariable import org.springframework.web.bind.annotation.RestController @RestController -class NetworkController(val currencyLoader: CurrencyLoader) { +class NetworkController(val currencyHandler: CurrencyHandler) { @GetMapping("currency/{currency}") suspend fun fetchCurrencyInfo(@PathVariable("currency") currency: String): CurrencyInfo { - return currencyLoader.fetchCurrencyInfo(currency) + return currencyHandler.fetchCurrencyInfo(currency) } } \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/AddChainRequest.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/AddChainRequest.kt new file mode 100644 index 000000000..ef148cdb3 --- /dev/null +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/AddChainRequest.kt @@ -0,0 +1,13 @@ +package co.nilin.opex.bcgateway.app.dto + +data class AddChainRequest( + val name: String?, + val addressType: String?, + val scannerEndpoint: String?, + val scheduleDelaySeconds: Int, + val scheduleErrorDelaySeconds: Int, +) { + fun isValid(): Boolean { + return !name.isNullOrEmpty() && !addressType.isNullOrEmpty() && scheduleDelaySeconds > 0 && scheduleErrorDelaySeconds > 0 + } +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/AddressTypeRequest.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/AddressTypeRequest.kt new file mode 100644 index 000000000..59117f969 --- /dev/null +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/AddressTypeRequest.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.bcgateway.app.dto + +data class AddressTypeRequest( + val name: String?, + val addressRegex: String?, + val memoRegex: String? +) \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/ChainEndpointRequest.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/ChainEndpointRequest.kt new file mode 100644 index 000000000..9fa2a225e --- /dev/null +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/ChainEndpointRequest.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.bcgateway.app.dto + +data class ChainEndpointRequest( + val url: String, + val username: String?, + val password: String? +) \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/ChainResponse.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/ChainResponse.kt new file mode 100644 index 000000000..d61a105e8 --- /dev/null +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/ChainResponse.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.bcgateway.app.dto + +data class ChainResponse( + val name: String, + val addressTypes: List, + val endpoints: List +) \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/TokenRequest.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/TokenRequest.kt new file mode 100644 index 000000000..63d42b1f0 --- /dev/null +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/TokenRequest.kt @@ -0,0 +1,13 @@ +package co.nilin.opex.bcgateway.app.dto + +data class TokenRequest( + val symbol: String?, + val chain: String?, + val isToken: Boolean, + val tokenName: String?, + val tokenAddress: String?, + val withdrawFee: Double, + val minimumWithdraw: Double, + val isWithdrawEnabled: Boolean = true, + val decimal: Int = 18 +) \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/TokenResponse.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/TokenResponse.kt new file mode 100644 index 000000000..d6cbbc9f5 --- /dev/null +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/dto/TokenResponse.kt @@ -0,0 +1,15 @@ +package co.nilin.opex.bcgateway.app.dto + +import java.math.BigDecimal + +data class TokenResponse( + val currency: String, + val chain: String, + val isToken: Boolean, + val tokenAddress: String?, + val tokenName: String?, + val isWithdrawEnabled: Boolean, + val withdrawFee: BigDecimal, + val withdrawMin: BigDecimal, + val decimal: Int +) \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/listener/AdminEventListenerImpl.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/listener/AdminEventListenerImpl.kt new file mode 100644 index 000000000..1bcf766c7 --- /dev/null +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/listener/AdminEventListenerImpl.kt @@ -0,0 +1,29 @@ +package co.nilin.opex.bcgateway.app.listener + +import co.nilin.opex.bcgateway.app.service.AdminService +import co.nilin.opex.bcgateway.ports.kafka.listener.model.AddCurrencyEvent +import co.nilin.opex.bcgateway.ports.kafka.listener.model.AdminEvent +import co.nilin.opex.bcgateway.ports.kafka.listener.model.DeleteCurrencyEvent +import co.nilin.opex.bcgateway.ports.kafka.listener.model.EditCurrencyEvent +import co.nilin.opex.bcgateway.ports.kafka.listener.spi.AdminEventListener +import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component + +@Component +class AdminEventListenerImpl(private val adminService: AdminService) : AdminEventListener { + + private val logger = LoggerFactory.getLogger(AdminEventListenerImpl::class.java) + + override fun id() = "AdminEventListener" + + override fun onEvent(event: AdminEvent, partition: Int, offset: Long, timestamp: Long): Unit = runBlocking { + logger.info("Incoming admin event $event") + when (event) { + is AddCurrencyEvent -> adminService.addCurrency(event.name, event.symbol) + is EditCurrencyEvent -> adminService.editCurrency(event.name, event.symbol) + is DeleteCurrencyEvent -> adminService.deleteCurrency(event.name) + else -> {} + } + } +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/service/AdminService.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/service/AdminService.kt new file mode 100644 index 000000000..9d366b26d --- /dev/null +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/service/AdminService.kt @@ -0,0 +1,63 @@ +package co.nilin.opex.bcgateway.app.service + +import co.nilin.opex.bcgateway.app.dto.AddChainRequest +import co.nilin.opex.bcgateway.app.dto.TokenRequest +import co.nilin.opex.bcgateway.core.model.CurrencyImplementation +import co.nilin.opex.bcgateway.core.spi.* +import org.springframework.stereotype.Service +import org.springframework.transaction.annotation.Transactional + +@Service +class AdminService( + private val chainLoader: ChainLoader, + private val currencyHandler: CurrencyHandler, + private val chainScheduler: ChainSyncSchedulerHandler, + private val addressTypeHandler: AddressTypeHandler, + private val chainEndpointHandler: ChainEndpointHandler, +) { + + suspend fun addCurrency(name: String, symbol: String) { + currencyHandler.addCurrency(name, symbol) + } + + suspend fun editCurrency(name: String, symbol: String) { + currencyHandler.editCurrency(name, symbol) + } + + suspend fun deleteCurrency(name: String) { + currencyHandler.deleteCurrency(name) + } + + @Transactional + suspend fun addChain(body: AddChainRequest) { + val chain = chainLoader.addChain(body.name!!, body.addressType!!) + chainScheduler.scheduleChain(chain.name, body.scheduleDelaySeconds, body.scheduleErrorDelaySeconds) + if (body.scannerEndpoint != null) + chainEndpointHandler.addEndpoint(chain.name, body.scannerEndpoint, null, null) + } + + suspend fun addAddressType(name: String, addressRegex: String, memoRegex: String?) { + addressTypeHandler.addAddressType(name, addressRegex, memoRegex) + } + + suspend fun addToken(body: TokenRequest): CurrencyImplementation { + return with(body) { + currencyHandler.addCurrencyImplementation( + symbol!!, + chain!!, + tokenName, + tokenAddress, + isToken, + withdrawFee, + minimumWithdraw, + isWithdrawEnabled, + decimal + ) + } + } + + suspend fun changeTokenWithdrawStatus(symbol: String, chain: String, status: Boolean) { + currencyHandler.changeWithdrawStatus(symbol, chain, status) + } + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/utils/Extensions.kt b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/utils/Extensions.kt new file mode 100644 index 000000000..96b3f0dd0 --- /dev/null +++ b/bc-gateway/bc-gateway-app/src/main/kotlin/co/nilin/opex/bcgateway/app/utils/Extensions.kt @@ -0,0 +1,17 @@ +package co.nilin.opex.bcgateway.app.utils + +import com.nimbusds.jose.shaded.json.JSONArray +import org.springframework.security.authorization.AuthorizationDecision +import org.springframework.security.config.web.server.ServerHttpSecurity +import org.springframework.security.oauth2.jwt.Jwt + +fun ServerHttpSecurity.AuthorizeExchangeSpec.Access.hasRole( + authority: String, + role: String +): ServerHttpSecurity.AuthorizeExchangeSpec = access { mono, _ -> + mono.map { auth -> + val hasAuthority = auth.authorities.any { it.authority == authority } + val hasRole = ((auth.principal as Jwt).claims["roles"] as JSONArray?)?.contains(role) == true + AuthorizationDecision(hasAuthority && hasRole) + } +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-app/src/main/resources/application.yml b/bc-gateway/bc-gateway-app/src/main/resources/application.yml index 6c13ce0e2..f754b5bc4 100644 --- a/bc-gateway/bc-gateway-app/src/main/resources/application.yml +++ b/bc-gateway/bc-gateway-app/src/main/resources/application.yml @@ -4,10 +4,11 @@ spring: name: opex-bc-gateway main: allow-bean-definition-overriding: false + allow-circular-references: true kafka: bootstrap-servers: ${KAFKA_IP_PORT:localhost:9092} consumer: - group-id: opex-bc-gateway + group-id: bc-gateway redis: host: ${REDIS_HOST:localhost} port: 6379 @@ -44,7 +45,8 @@ spring: import: vault://secret/${spring.application.name} logging: level: - org.apache.kafka: DEBUG + org.apache.kafka: ERROR + co.nilin: DEBUG swagger.authUrl: https://api.opex.dev app: diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/AssignAddressServiceImpl.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/AssignAddressServiceImpl.kt index e1f500537..fea090306 100644 --- a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/AssignAddressServiceImpl.kt +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/AssignAddressServiceImpl.kt @@ -6,19 +6,19 @@ import co.nilin.opex.bcgateway.core.model.AssignedAddress import co.nilin.opex.bcgateway.core.model.Chain import co.nilin.opex.bcgateway.core.model.Currency import co.nilin.opex.bcgateway.core.spi.AssignedAddressHandler -import co.nilin.opex.bcgateway.core.spi.CurrencyLoader +import co.nilin.opex.bcgateway.core.spi.CurrencyHandler import co.nilin.opex.bcgateway.core.spi.ReservedAddressHandler import co.nilin.opex.utility.error.data.OpexError import co.nilin.opex.utility.error.data.OpexException class AssignAddressServiceImpl( - val currencyLoader: CurrencyLoader, + val currencyHandler: CurrencyHandler, val assignedAddressHandler: AssignedAddressHandler, val reservedAddressHandler: ReservedAddressHandler ) : AssignAddressService { override suspend fun assignAddress(user: String, currency: Currency): List { - val currencyInfo = currencyLoader.fetchCurrencyInfo(currency.symbol) + val currencyInfo = currencyHandler.fetchCurrencyInfo(currency.symbol) val chains = currencyInfo.implementations .map { imp -> imp.chain } val addressTypes = chains diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt index 2d81c6c0e..0bf36581d 100644 --- a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImpl.kt @@ -13,11 +13,11 @@ import kotlin.coroutines.coroutineContext open class ChainSyncServiceImpl( private val chainSyncSchedulerHandler: ChainSyncSchedulerHandler, - private val chainEndpointProxyFinder: ChainEndpointProxyFinder, + private val chainEndpointHandler: ChainEndpointHandler, private val chainSyncRecordHandler: ChainSyncRecordHandler, private val walletSyncRecordHandler: WalletSyncRecordHandler, private val chainSyncRetryHandler: ChainSyncRetryHandler, - private val currencyLoader: CurrencyLoader, + private val currencyHandler: CurrencyHandler, private val operator: TransactionalOperator, private val dispatcher: ExecutorCoroutineDispatcher ) : ChainSyncService { @@ -29,9 +29,9 @@ open class ChainSyncServiceImpl( val schedules = chainSyncSchedulerHandler.fetchActiveSchedules(currentTime()) schedules.map { syncSchedule -> async(dispatcher) { - val syncHandler = chainEndpointProxyFinder.findChainEndpointProxy(syncSchedule.chainName) + val syncHandler = chainEndpointHandler.findChainEndpointProxy(syncSchedule.chainName) val lastSync = chainSyncRecordHandler.loadLastSuccessRecord(syncSchedule.chainName) - val tokens = currencyLoader.findImplementationsWithTokenOnChain(syncSchedule.chainName) + val tokens = currencyHandler.findImplementationsWithTokenOnChain(syncSchedule.chainName) .map { impl -> impl.tokenAddress ?: "" } .toList() diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt index a43f688b2..538eadd98 100644 --- a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/service/WalletSyncServiceImpl.kt @@ -20,7 +20,7 @@ class WalletSyncServiceImpl( private val walletProxy: WalletProxy, private val walletSyncRecordHandler: WalletSyncRecordHandler, private val assignedAddressHandler: AssignedAddressHandler, - private val currencyLoader: CurrencyLoader, + private val currencyHandler: CurrencyHandler, private val dispatcher: ExecutorCoroutineDispatcher ) : WalletSyncService { @@ -39,7 +39,7 @@ class WalletSyncServiceImpl( val uuid = assignedAddressHandler.findUuid(deposit.depositor, deposit.depositorMemo) if (uuid != null) { logger.info("deposit came for $uuid - to ${deposit.depositor}") - val symbol = currencyLoader.findByChainAndTokenAddress(deposit.chain, deposit.tokenAddress) + val symbol = currencyHandler.findByChainAndTokenAddress(deposit.chain, deposit.tokenAddress) if (symbol != null) { sendDeposit(uuid, symbol, deposit) deposited = true diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/AddressTypeHandler.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/AddressTypeHandler.kt new file mode 100644 index 000000000..6a60f06aa --- /dev/null +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/AddressTypeHandler.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.bcgateway.core.spi + +import co.nilin.opex.bcgateway.core.model.AddressType + +interface AddressTypeHandler { + + suspend fun fetchAll(): List + + suspend fun addAddressType(name: String, addressRegex: String, memoRegex: String?) + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainEndpointHandler.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainEndpointHandler.kt new file mode 100644 index 000000000..b77c5cbe6 --- /dev/null +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainEndpointHandler.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.bcgateway.core.spi + +interface ChainEndpointHandler { + + suspend fun addEndpoint(chainName: String, url: String, username: String?, password: String?) + + suspend fun deleteEndpoint(chainName: String, url: String) + + suspend fun findChainEndpointProxy(chainName: String): ChainEndpointProxy +} diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainEndpointProxyFinder.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainEndpointProxyFinder.kt deleted file mode 100644 index f4762765a..000000000 --- a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainEndpointProxyFinder.kt +++ /dev/null @@ -1,5 +0,0 @@ -package co.nilin.opex.bcgateway.core.spi - -interface ChainEndpointProxyFinder { - suspend fun findChainEndpointProxy(chainName: String): ChainEndpointProxy -} diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainLoader.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainLoader.kt index 41a13a183..ac274c2e0 100644 --- a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainLoader.kt +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainLoader.kt @@ -3,5 +3,10 @@ package co.nilin.opex.bcgateway.core.spi import co.nilin.opex.bcgateway.core.model.Chain interface ChainLoader { + + suspend fun addChain(name: String, addressType:String):Chain + + suspend fun fetchAllChains():List + suspend fun fetchChainInfo(chain: String): Chain } diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt index 223702b6c..61ca28be8 100644 --- a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/ChainSyncSchedulerHandler.kt @@ -4,6 +4,10 @@ import co.nilin.opex.bcgateway.core.model.ChainSyncSchedule import java.time.LocalDateTime interface ChainSyncSchedulerHandler { + suspend fun fetchActiveSchedules(time: LocalDateTime): List + suspend fun prepareScheduleForNextTry(syncSchedule: ChainSyncSchedule, success: Boolean) + + suspend fun scheduleChain(chain: String, delaySeconds: Int, errorDelaySeconds: Int) } diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/CurrencyHandler.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/CurrencyHandler.kt new file mode 100644 index 000000000..5eb650f2a --- /dev/null +++ b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/CurrencyHandler.kt @@ -0,0 +1,36 @@ +package co.nilin.opex.bcgateway.core.spi + +import co.nilin.opex.bcgateway.core.model.CurrencyImplementation +import co.nilin.opex.bcgateway.core.model.CurrencyInfo + +interface CurrencyHandler { + + suspend fun addCurrency(name: String, symbol: String) + + suspend fun editCurrency(name: String, symbol: String) + + suspend fun deleteCurrency(name: String) + + suspend fun addCurrencyImplementation( + symbol: String, + chain: String, + tokenName: String?, + tokenAddress: String?, + isToken: Boolean, + withdrawFee: Double, + minimumWithdraw: Double, + isWithdrawEnabled: Boolean, + decimal: Int + ): CurrencyImplementation + + suspend fun fetchAllImplementations(): List + + suspend fun fetchCurrencyInfo(symbol: String): CurrencyInfo + + suspend fun findByChainAndTokenAddress(chain: String, address: String?): CurrencyImplementation? + + suspend fun findImplementationsWithTokenOnChain(chain: String): List + + suspend fun changeWithdrawStatus(symbol: String, chain: String, status: Boolean) + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/CurrencyLoader.kt b/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/CurrencyLoader.kt deleted file mode 100644 index d275d263b..000000000 --- a/bc-gateway/bc-gateway-core/src/main/kotlin/co/nilin/opex/bcgateway/core/spi/CurrencyLoader.kt +++ /dev/null @@ -1,10 +0,0 @@ -package co.nilin.opex.bcgateway.core.spi - -import co.nilin.opex.bcgateway.core.model.CurrencyImplementation -import co.nilin.opex.bcgateway.core.model.CurrencyInfo - -interface CurrencyLoader { - suspend fun fetchCurrencyInfo(symbol: String): CurrencyInfo - suspend fun findByChainAndTokenAddress(chain: String, address: String?): CurrencyImplementation? - suspend fun findImplementationsWithTokenOnChain(chain: String): List -} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/AssignAddressServiceImplUnitTest.kt b/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/AssignAddressServiceImplUnitTest.kt index ccd60628d..540849968 100644 --- a/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/AssignAddressServiceImplUnitTest.kt +++ b/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/AssignAddressServiceImplUnitTest.kt @@ -3,7 +3,7 @@ package co.nilin.opex.bcgateway.core.service import co.nilin.opex.bcgateway.core.model.* import co.nilin.opex.bcgateway.core.model.Currency import co.nilin.opex.bcgateway.core.spi.AssignedAddressHandler -import co.nilin.opex.bcgateway.core.spi.CurrencyLoader +import co.nilin.opex.bcgateway.core.spi.CurrencyHandler import co.nilin.opex.bcgateway.core.spi.ReservedAddressHandler import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Assertions @@ -16,7 +16,7 @@ import java.util.* class AssignAddressServiceImplUnitTest { @Mock - lateinit var currencyLoader: CurrencyLoader + lateinit var currencyHandler: CurrencyHandler @Mock lateinit var assignedAddressHandler: AssignedAddressHandler @@ -36,7 +36,7 @@ class AssignAddressServiceImplUnitTest { init { MockitoAnnotations.openMocks(this) assignAddressServiceImpl = AssignAddressServiceImpl( - currencyLoader, assignedAddressHandler, reservedAddressHandler + currencyHandler, assignedAddressHandler, reservedAddressHandler ) runBlocking { val eth = @@ -53,7 +53,7 @@ class AssignAddressServiceImplUnitTest { 18 ) - Mockito.`when`(currencyLoader.fetchCurrencyInfo(currency.symbol)) + Mockito.`when`(currencyHandler.fetchCurrencyInfo(currency.symbol)) .thenReturn(CurrencyInfo(currency, listOf(eth, wrappedEth))) } diff --git a/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt b/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt index c7b60af47..c4c406328 100644 --- a/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt +++ b/bc-gateway/bc-gateway-core/src/test/kotlin/co/nilin/opex/bcgateway/core/service/ChainSyncServiceImplTest.kt @@ -29,7 +29,7 @@ internal class ChainSyncServiceImplTest { lateinit var chainSyncSchedulerHandler: ChainSyncSchedulerHandler @Mock - lateinit var chainEndpointProxyFinder: ChainEndpointProxyFinder + lateinit var chainEndpointHandler: ChainEndpointHandler @Mock lateinit var chainSyncRecordHandler: ChainSyncRecordHandler @@ -41,25 +41,25 @@ internal class ChainSyncServiceImplTest { lateinit var chainSyncRetryHandler: ChainSyncRetryHandler @Mock - lateinit var currencyLoader: CurrencyLoader + lateinit var currencyHandler: CurrencyHandler private val endpointProxy: ChainEndpointProxy = mock() init { MockitoAnnotations.openMocks(this) runBlocking { - Mockito.`when`(chainEndpointProxyFinder.findChainEndpointProxy(ethChain)) + Mockito.`when`(chainEndpointHandler.findChainEndpointProxy(ethChain)) .thenReturn(endpointProxy) - Mockito.`when`(currencyLoader.findImplementationsWithTokenOnChain(ethChain)).thenReturn(emptyList()) + Mockito.`when`(currencyHandler.findImplementationsWithTokenOnChain(ethChain)).thenReturn(emptyList()) } syncService = object : ChainSyncServiceImpl( chainSyncSchedulerHandler, - chainEndpointProxyFinder, + chainEndpointHandler, chainSyncRecordHandler, walletSyncRecordHandler, chainSyncRetryHandler, - currencyLoader, + currencyHandler, OPERATOR, Executors.newFixedThreadPool(2).asCoroutineDispatcher() ) { @@ -78,10 +78,10 @@ internal class ChainSyncServiceImplTest { //then verifyNoMoreInteractions( - chainEndpointProxyFinder, + chainEndpointHandler, chainSyncRecordHandler, walletSyncRecordHandler, - currencyLoader + currencyHandler ) } } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/pom.xml b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/pom.xml new file mode 100644 index 000000000..d4ca2898c --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/pom.xml @@ -0,0 +1,57 @@ + + + 4.0.0 + + + co.nilin.opex.bcgateway + bc-gateway + 1.0-SNAPSHOT + ../../pom.xml + + + co.nilin.opex.bcgateway.ports.kafka.listener + bc-gateway-eventlistener-kafka + bc-gateway-eventlistener-kafka + bc-gateway kafka listener of Opex + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-webflux + + + org.springframework.kafka + spring-kafka + + + io.projectreactor.kotlin + reactor-kotlin-extensions + + + org.jetbrains.kotlin + kotlin-reflect + + + org.jetbrains.kotlinx + kotlinx-coroutines-reactor + + + org.jetbrains.kotlinx + kotlinx-coroutines-core + + + co.nilin.opex.bcgateway.core + bc-gateway-core + + + org.springframework.kafka + spring-kafka-test + test + + + diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/config/KafkaConfig.kt b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/config/KafkaConfig.kt new file mode 100644 index 000000000..19d44e68d --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/config/KafkaConfig.kt @@ -0,0 +1,58 @@ +package co.nilin.opex.bcgateway.ports.kafka.listener.config + +import co.nilin.opex.bcgateway.ports.kafka.listener.consumer.AdminEventKafkaListener +import co.nilin.opex.bcgateway.ports.kafka.listener.model.AdminEvent +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.support.serializer.JsonDeserializer +import java.util.regex.Pattern + +@Configuration +class KafkaConfig { + + @Value("\${spring.kafka.bootstrap-servers}") + private val bootstrapServers: String? = null + + @Value("\${spring.kafka.consumer.group-id}") + private val groupId: String? = null + + @Bean + fun consumerConfigs(): Map { + return mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG to groupId, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, + JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", + JsonDeserializer.TYPE_MAPPINGS to "admin_add_currency:co.nilin.opex.bcgateway.ports.kafka.listener.model.AddCurrencyEvent,admin_edit_currency:co.nilin.opex.bcgateway.ports.kafka.listener.model.EditCurrencyEvent,admin_delete_currency:co.nilin.opex.bcgateway.ports.kafka.listener.model.DeleteCurrencyEvent" + ) + } + + @Bean + fun adminEventsConsumerFactory(consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) + } + + @Autowired + @ConditionalOnBean(AdminEventKafkaListener::class) + fun configureAdminEventListener( + listener: AdminEventKafkaListener, + consumerFactory: ConsumerFactory + ) { + val containerProps = ContainerProperties(Pattern.compile("admin_event")) + containerProps.messageListener = listener + val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) + container.setBeanName("AdminEventKafkaListenerContainer") + container.start() + } + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/consumer/AdminEventKafkaListener.kt b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/consumer/AdminEventKafkaListener.kt new file mode 100644 index 000000000..d9ae19119 --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/consumer/AdminEventKafkaListener.kt @@ -0,0 +1,27 @@ +package co.nilin.opex.bcgateway.ports.kafka.listener.consumer + +import co.nilin.opex.bcgateway.ports.kafka.listener.model.AdminEvent +import co.nilin.opex.bcgateway.ports.kafka.listener.spi.AdminEventListener +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.listener.MessageListener +import org.springframework.stereotype.Component + +@Component +class AdminEventKafkaListener : MessageListener { + + private val listeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { + listeners.forEach { + it.onEvent(data.value(), data.partition(), data.offset(), data.timestamp()) + } + } + + fun addEventListener(tl: AdminEventListener) { + listeners.add(tl) + } + + fun removeEventListener(tl: AdminEventListener) { + listeners.removeIf { it.id() == tl.id() } + } +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/AddCurrencyEvent.kt b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/AddCurrencyEvent.kt new file mode 100644 index 000000000..38b001e65 --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/AddCurrencyEvent.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.bcgateway.ports.kafka.listener.model + +data class AddCurrencyEvent( + val name: String, + val symbol: String, + val precision: Double +) : AdminEvent() \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/AdminEvent.kt b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/AdminEvent.kt new file mode 100644 index 000000000..cb67b1991 --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/AdminEvent.kt @@ -0,0 +1,3 @@ +package co.nilin.opex.bcgateway.ports.kafka.listener.model + +abstract class AdminEvent \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/DeleteCurrencyEvent.kt b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/DeleteCurrencyEvent.kt new file mode 100644 index 000000000..19993f1d0 --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/DeleteCurrencyEvent.kt @@ -0,0 +1,3 @@ +package co.nilin.opex.bcgateway.ports.kafka.listener.model + +data class DeleteCurrencyEvent(val name: String) : AdminEvent() \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/EditCurrencyEvent.kt b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/EditCurrencyEvent.kt new file mode 100644 index 000000000..2fc5a1fe4 --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/model/EditCurrencyEvent.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.bcgateway.ports.kafka.listener.model + +data class EditCurrencyEvent( + val name: String, + val symbol: String, + val precision: Double +) : AdminEvent() \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/spi/AdminEventListener.kt b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/spi/AdminEventListener.kt new file mode 100644 index 000000000..530a6c5cb --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-eventlistener-kafka/src/main/kotlin/co/nilin/opex/bcgateway/ports/kafka/listener/spi/AdminEventListener.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.bcgateway.ports.kafka.listener.spi + +import co.nilin.opex.bcgateway.ports.kafka.listener.model.AdminEvent + +interface AdminEventListener { + + fun id(): String + + fun onEvent(event: AdminEvent, partition: Int, offset: Long, timestamp: Long) + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/AddressTypeRepository.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/AddressTypeRepository.kt index 67d76791d..e0a32428a 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/AddressTypeRepository.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/AddressTypeRepository.kt @@ -3,6 +3,11 @@ package co.nilin.opex.bcgateway.ports.postgres.dao import co.nilin.opex.bcgateway.ports.postgres.model.AddressTypeModel import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono @Repository -interface AddressTypeRepository : ReactiveCrudRepository \ No newline at end of file +interface AddressTypeRepository : ReactiveCrudRepository { + + fun findByType(type: String): Mono + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainEndpointRepository.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainEndpointRepository.kt index 057f033cb..05ec4d2d6 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainEndpointRepository.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainEndpointRepository.kt @@ -3,6 +3,11 @@ package co.nilin.opex.bcgateway.ports.postgres.dao import co.nilin.opex.bcgateway.ports.postgres.model.ChainEndpointModel import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono @Repository -interface ChainEndpointRepository : ReactiveCrudRepository \ No newline at end of file +interface ChainEndpointRepository : ReactiveCrudRepository { + + fun deleteByChainNameAndUrl(chainName: String, url: String): Mono + +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainRepository.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainRepository.kt index 08f0ec606..12e93c04f 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainRepository.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainRepository.kt @@ -11,11 +11,15 @@ import reactor.core.publisher.Mono @Repository interface ChainRepository : ReactiveCrudRepository { + + @Query("insert into chains values (:name) on conflict do nothing") + fun insert(name: String): Mono + fun findByName(name: String): Mono @Query( """ - select address_types.id, chain_address_types.chain_name, address_types.address_type, address_types.address_regex, address_types.memo_regex + select distinct address_types.id, chain_address_types.chain_name, address_types.address_type, address_types.address_regex, address_types.memo_regex from chain_address_types join address_types on address_types.id = chain_address_types.addr_type_id diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncScheduleRepository.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncScheduleRepository.kt index 2b4e2603b..bfd762f29 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncScheduleRepository.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/ChainSyncScheduleRepository.kt @@ -5,10 +5,17 @@ import kotlinx.coroutines.flow.Flow import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono import java.time.LocalDateTime @Repository interface ChainSyncScheduleRepository : ReactiveCrudRepository { + + @Query("insert into chain_sync_schedules values (:chain, CURRENT_DATE, :delay, :errorDelay) on conflict do nothing") + fun insert(chain: String, delay: Int, errorDelay: Int): Mono + @Query("select * from chain_sync_schedules where retry_time <= :time") fun findActiveSchedule(time: LocalDateTime): Flow + + } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/CurrencyImplementationRepository.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/CurrencyImplementationRepository.kt index af0c4898b..5829f9c84 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/CurrencyImplementationRepository.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/CurrencyImplementationRepository.kt @@ -13,5 +13,7 @@ interface CurrencyImplementationRepository : ReactiveCrudRepository + fun findBySymbolAndChain(symbol: String, chain: String): Mono + fun findByChainAndTokenAddress(chain: String, tokenAddress: String?): Mono } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/CurrencyRepository.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/CurrencyRepository.kt index 0da9a257e..2dd1e4517 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/CurrencyRepository.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/dao/CurrencyRepository.kt @@ -1,11 +1,19 @@ package co.nilin.opex.bcgateway.ports.postgres.dao import co.nilin.opex.bcgateway.ports.postgres.model.CurrencyModel +import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository import reactor.core.publisher.Mono @Repository interface CurrencyRepository : ReactiveCrudRepository { + fun findBySymbol(symbol: String): Mono + + @Query("insert into currency values (:symbol, :name) on conflict do nothing") + fun insert(name: String, symbol: String): Mono + + @Query("delete from currency where name = :name") + fun deleteByName(name: String): Mono } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/AddressTypeHandlerImpl.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/AddressTypeHandlerImpl.kt new file mode 100644 index 000000000..4fa98fc3a --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/AddressTypeHandlerImpl.kt @@ -0,0 +1,26 @@ +package co.nilin.opex.bcgateway.ports.postgres.impl + +import co.nilin.opex.bcgateway.core.model.AddressType +import co.nilin.opex.bcgateway.core.spi.AddressTypeHandler +import co.nilin.opex.bcgateway.ports.postgres.dao.AddressTypeRepository +import co.nilin.opex.bcgateway.ports.postgres.model.AddressTypeModel +import kotlinx.coroutines.reactive.awaitFirstOrElse +import kotlinx.coroutines.reactive.awaitFirstOrNull +import org.springframework.stereotype.Component + +@Component +class AddressTypeHandlerImpl(private val repository: AddressTypeRepository) : AddressTypeHandler { + + override suspend fun fetchAll(): List { + return repository.findAll() + .collectList() + .awaitFirstOrElse { emptyList() } + .map { AddressType(it.id!!, it.type, it.addressRegex, it.memoRegex) } + } + + override suspend fun addAddressType(name: String, addressRegex: String, memoRegex: String?) { + if (repository.findByType(name).awaitFirstOrNull() == null) { + repository.save(AddressTypeModel(null, name, addressRegex, memoRegex)).awaitFirstOrNull() + } + } +} \ No newline at end of file diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainEndpointHandlerImpl.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainEndpointHandlerImpl.kt new file mode 100644 index 000000000..59c3643d0 --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainEndpointHandlerImpl.kt @@ -0,0 +1,35 @@ +package co.nilin.opex.bcgateway.ports.postgres.impl + +import co.nilin.opex.bcgateway.core.model.Endpoint +import co.nilin.opex.bcgateway.core.spi.ChainEndpointHandler +import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxy +import co.nilin.opex.bcgateway.ports.chainproxy.impl.ChainEndpointProxyImpl +import co.nilin.opex.bcgateway.ports.postgres.dao.ChainEndpointRepository +import co.nilin.opex.bcgateway.ports.postgres.dao.ChainRepository +import co.nilin.opex.bcgateway.ports.postgres.model.ChainEndpointModel +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.awaitFirstOrNull +import org.springframework.stereotype.Component +import org.springframework.web.reactive.function.client.WebClient + +@Component +class ChainEndpointHandlerImpl( + private val webClient: WebClient, + private val chainRepository: ChainRepository, + private val endpointRepository: ChainEndpointRepository +) : ChainEndpointHandler { + + override suspend fun addEndpoint(chainName: String, url: String, username: String?, password: String?) { + endpointRepository.save(ChainEndpointModel(null, chainName, url, username, password)).awaitFirstOrNull() + } + + override suspend fun deleteEndpoint(chainName: String, url: String) { + endpointRepository.deleteByChainNameAndUrl(chainName, url).awaitFirstOrNull() + } + + override suspend fun findChainEndpointProxy(chainName: String): ChainEndpointProxy { + val endpoints = chainRepository.findEndpointsByName(chainName).map { Endpoint(it.url) }.toList() + return ChainEndpointProxyImpl(chainName, endpoints, webClient) + } +} diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainEndpointProxyFinderImpl.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainEndpointProxyFinderImpl.kt deleted file mode 100644 index 3bfea4b83..000000000 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainEndpointProxyFinderImpl.kt +++ /dev/null @@ -1,20 +0,0 @@ -package co.nilin.opex.bcgateway.ports.postgres.impl - -import co.nilin.opex.bcgateway.core.model.Endpoint -import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxy -import co.nilin.opex.bcgateway.core.spi.ChainEndpointProxyFinder -import co.nilin.opex.bcgateway.ports.chainproxy.impl.ChainEndpointProxyImpl -import co.nilin.opex.bcgateway.ports.postgres.dao.ChainRepository -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList -import org.springframework.stereotype.Component -import org.springframework.web.reactive.function.client.WebClient - -@Component -class ChainEndpointProxyFinderImpl(private val chainRepository: ChainRepository, private val webClient: WebClient) : - ChainEndpointProxyFinder { - override suspend fun findChainEndpointProxy(chainName: String): ChainEndpointProxy { - val endpoints = chainRepository.findEndpointsByName(chainName).map { Endpoint(it.url) }.toList() - return ChainEndpointProxyImpl(chainName, endpoints, webClient) - } -} diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainHandler.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainHandler.kt index 285943589..f0333e6b6 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainHandler.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainHandler.kt @@ -4,14 +4,55 @@ import co.nilin.opex.bcgateway.core.model.AddressType import co.nilin.opex.bcgateway.core.model.Chain import co.nilin.opex.bcgateway.core.model.Endpoint import co.nilin.opex.bcgateway.core.spi.ChainLoader +import co.nilin.opex.bcgateway.ports.postgres.dao.AddressTypeRepository +import co.nilin.opex.bcgateway.ports.postgres.dao.ChainAddressTypeRepository import co.nilin.opex.bcgateway.ports.postgres.dao.ChainRepository +import co.nilin.opex.bcgateway.ports.postgres.model.ChainAddressTypeModel +import co.nilin.opex.utility.error.data.OpexError +import co.nilin.opex.utility.error.data.OpexException import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrElse +import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle import org.springframework.stereotype.Component @Component -class ChainHandler(val chainRepository: ChainRepository) : ChainLoader { +class ChainHandler( + private val chainRepository: ChainRepository, + private val addressTypeRepository: AddressTypeRepository, + private val chainAddressRepository: ChainAddressTypeRepository +) : ChainLoader { + + override suspend fun addChain(name: String, addressType: String): Chain { + val chain = chainRepository.findByName(name).awaitFirstOrNull() + if (chain != null) + throw OpexException(OpexError.BadRequest) + + val type = addressTypeRepository.findByType(addressType).awaitFirstOrNull() + ?: throw OpexException(OpexError.InvalidAddressType) + + chainRepository.insert(name).awaitFirstOrNull() + val model = chainRepository.findByName(name).awaitFirst() + chainAddressRepository.save(ChainAddressTypeModel(null, model.name, type.id!!)).awaitFirstOrNull() + return Chain(model.name, emptyList(), emptyList()) + } + + override suspend fun fetchAllChains(): List { + return chainRepository.findAll() + .collectList() + .awaitFirstOrElse { emptyList() } + .map { c -> + val addressTypes = chainRepository.findAddressTypesByName(c.name) + .map { AddressType(it.id!!, it.type, it.addressRegex, it.memoRegex) } + .toList() + + val endpoints = chainRepository.findEndpointsByName(c.name).map { Endpoint(it.url) }.toList() + Chain(c.name, addressTypes, endpoints) + } + } + override suspend fun fetchChainInfo(chain: String): Chain { val chainDao = chainRepository.findByName(chain).awaitSingle() val addressTypes = chainRepository.findAddressTypesByName(chain) @@ -19,4 +60,5 @@ class ChainHandler(val chainRepository: ChainRepository) : ChainLoader { val endpoints = chainRepository.findEndpointsByName(chain).map { Endpoint(it.url) }.toList() return Chain(chainDao.name, addressTypes, endpoints) } + } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncSchedulerHandlerImpl.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncSchedulerHandlerImpl.kt index cc525f724..d01ab9010 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncSchedulerHandlerImpl.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/ChainSyncSchedulerHandlerImpl.kt @@ -7,6 +7,8 @@ import co.nilin.opex.bcgateway.ports.postgres.model.ChainSyncScheduleModel import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingleOrNull import org.springframework.stereotype.Component import java.time.LocalDateTime import java.time.temporal.ChronoUnit @@ -30,4 +32,16 @@ class ChainSyncSchedulerHandlerImpl(private val chainSyncScheduleRepository: Cha val dao = ChainSyncScheduleModel(chain, time, syncSchedule.delay, syncSchedule.errorDelay) chainSyncScheduleRepository.save(dao).awaitFirst() } + + override suspend fun scheduleChain(chain: String, delaySeconds: Int, errorDelaySeconds: Int) { + with(chainSyncScheduleRepository.findById(chain).awaitSingleOrNull()) { + if (this != null) { + delay = delaySeconds.toLong() + errorDelay = errorDelaySeconds.toLong() + chainSyncScheduleRepository.save(this).awaitFirstOrNull() + } else { + chainSyncScheduleRepository.insert(chain, delaySeconds, errorDelaySeconds).awaitFirstOrNull() + } + } + } } diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/CurrencyHandlerImpl.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/CurrencyHandlerImpl.kt new file mode 100644 index 000000000..9f083022b --- /dev/null +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/CurrencyHandlerImpl.kt @@ -0,0 +1,165 @@ +package co.nilin.opex.bcgateway.ports.postgres.impl + +import co.nilin.opex.bcgateway.core.model.* +import co.nilin.opex.bcgateway.core.spi.CurrencyHandler +import co.nilin.opex.bcgateway.ports.postgres.dao.ChainRepository +import co.nilin.opex.bcgateway.ports.postgres.dao.CurrencyImplementationRepository +import co.nilin.opex.bcgateway.ports.postgres.dao.CurrencyRepository +import co.nilin.opex.bcgateway.ports.postgres.model.CurrencyImplementationModel +import co.nilin.opex.bcgateway.ports.postgres.model.CurrencyModel +import co.nilin.opex.utility.error.data.OpexError +import co.nilin.opex.utility.error.data.OpexException +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrElse +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactor.awaitSingleOrNull +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component + +@Component +class CurrencyHandlerImpl( + private val chainRepository: ChainRepository, + private val currencyRepository: CurrencyRepository, + private val currencyImplementationRepository: CurrencyImplementationRepository +) : CurrencyHandler { + + private val logger = LoggerFactory.getLogger(CurrencyHandler::class.java) + + override suspend fun addCurrency(name: String, symbol: String) { + try { + currencyRepository.insert(name, symbol.uppercase()).awaitSingleOrNull() + } catch (e: Exception) { + logger.error("Could not insert new currency $name", e) + } + } + + override suspend fun editCurrency(name: String, symbol: String) { + val currency = currencyRepository.findBySymbol(symbol).awaitFirstOrNull() + if (currency != null) { + currency.name = name + currencyRepository.save(currency).awaitFirst() + } + } + + override suspend fun deleteCurrency(name: String) { + try { + currencyRepository.deleteByName(name).awaitFirstOrNull() + } catch (e: Exception) { + logger.error("Could not delete currency $name", e) + } + } + + override suspend fun addCurrencyImplementation( + symbol: String, + chain: String, + tokenName: String?, + tokenAddress: String?, + isToken: Boolean, + withdrawFee: Double, + minimumWithdraw: Double, + isWithdrawEnabled: Boolean, + decimal: Int + ): CurrencyImplementation { + val chainModel = chainRepository.findByName(chain.lowercase()).awaitFirstOrNull() + ?: throw OpexException(OpexError.ChainNotFound) + + currencyImplementationRepository.findBySymbolAndChain(symbol.uppercase(), chain) + .awaitFirstOrNull() + ?.let { throw OpexException(OpexError.DuplicateToken) } + + val currency = currencyRepository.findBySymbol(symbol.uppercase()).awaitFirstOrNull() + ?: throw OpexException(OpexError.CurrencyNotFoundBC) + + val model = currencyImplementationRepository.save( + CurrencyImplementationModel( + null, + symbol.uppercase(), + chainModel.name, + isToken, + tokenAddress, + tokenName, + isWithdrawEnabled, + withdrawFee.toBigDecimal(), + minimumWithdraw.toBigDecimal(), + decimal + ) + ).awaitFirst() + + logger.info("Add currency implementation: ${model.symbol} - ${model.chain}") + + return projectCurrencyImplementation(model, currency) + } + + override suspend fun fetchAllImplementations(): List { + return currencyImplementationRepository.findAll() + .collectList() + .awaitFirstOrElse { emptyList() } + .map { + val currency = currencyRepository.findBySymbol(it.symbol).awaitFirstOrNull() + projectCurrencyImplementation(it, currency) + } + } + + override suspend fun fetchCurrencyInfo(symbol: String): CurrencyInfo { + val symbolUpperCase = symbol.uppercase() + val currencyModel = currencyRepository.findBySymbol(symbolUpperCase).awaitSingleOrNull() + if (currencyModel === null) { + return CurrencyInfo(Currency("", symbolUpperCase), emptyList()) + } + val currencyImplModel = currencyImplementationRepository.findBySymbol(symbolUpperCase) + val currency = Currency(currencyModel.symbol, currencyModel.name) + val implementations = currencyImplModel.map { projectCurrencyImplementation(it, currencyModel) } + return CurrencyInfo(currency, implementations.toList()) + } + + override suspend fun findByChainAndTokenAddress(chain: String, address: String?): CurrencyImplementation? { + val impl = currencyImplementationRepository.findByChainAndTokenAddress(chain, address) + .awaitFirstOrNull() + + return if (impl != null) + projectCurrencyImplementation(impl) + else + null + } + + override suspend fun findImplementationsWithTokenOnChain(chain: String): List { + return currencyImplementationRepository.findByChain(chain).map { projectCurrencyImplementation(it) }.toList() + } + + override suspend fun changeWithdrawStatus(symbol: String, chain: String, status: Boolean) { + val impl = currencyImplementationRepository.findBySymbolAndChain(symbol, chain).awaitSingleOrNull() + ?: throw OpexException(OpexError.TokenNotFound) + + impl.apply { + withdrawEnabled = status + currencyImplementationRepository.save(impl).awaitFirstOrNull() + } + } + + private suspend fun projectCurrencyImplementation( + currencyImplementationModel: CurrencyImplementationModel, + currencyModel: CurrencyModel? = null + ): CurrencyImplementation { + val addressTypesModel = chainRepository.findAddressTypesByName(currencyImplementationModel.chain) + val addressTypes = addressTypesModel.map { AddressType(it.id!!, it.type, it.addressRegex, it.memoRegex) } + val endpointsModel = chainRepository.findEndpointsByName(currencyImplementationModel.chain) + val endpoints = endpointsModel.map { Endpoint(it.url) } + val currencyModelVal = + currencyModel ?: currencyRepository.findBySymbol(currencyImplementationModel.symbol).awaitSingle() + val currency = Currency(currencyModelVal.symbol, currencyModelVal.name) + return CurrencyImplementation( + currency, + Chain(currencyImplementationModel.chain, addressTypes.toList(), endpoints.toList()), + currencyImplementationModel.token, + currencyImplementationModel.tokenAddress, + currencyImplementationModel.tokenName, + currencyImplementationModel.withdrawEnabled, + currencyImplementationModel.withdrawFee, + currencyImplementationModel.withdrawMin, + currencyImplementationModel.decimal + ) + } +} diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/CurrencyLoaderImpl.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/CurrencyLoaderImpl.kt deleted file mode 100644 index 31c3e3efe..000000000 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/impl/CurrencyLoaderImpl.kt +++ /dev/null @@ -1,73 +0,0 @@ -package co.nilin.opex.bcgateway.ports.postgres.impl - -import co.nilin.opex.bcgateway.core.model.* -import co.nilin.opex.bcgateway.core.spi.CurrencyLoader -import co.nilin.opex.bcgateway.ports.postgres.dao.ChainRepository -import co.nilin.opex.bcgateway.ports.postgres.dao.CurrencyImplementationRepository -import co.nilin.opex.bcgateway.ports.postgres.dao.CurrencyRepository -import co.nilin.opex.bcgateway.ports.postgres.model.CurrencyImplementationModel -import co.nilin.opex.bcgateway.ports.postgres.model.CurrencyModel -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.reactive.awaitFirstOrNull -import kotlinx.coroutines.reactive.awaitSingle -import kotlinx.coroutines.reactive.awaitSingleOrNull -import org.springframework.stereotype.Component - -@Component -class CurrencyLoaderImpl( - private val chainRepository: ChainRepository, - private val currencyRepository: CurrencyRepository, - private val currencyImplementationRepository: CurrencyImplementationRepository -) : CurrencyLoader { - - override suspend fun fetchCurrencyInfo(symbol: String): CurrencyInfo { - val symbolUpperCase = symbol.toUpperCase() - val currencyModel = currencyRepository.findBySymbol(symbolUpperCase).awaitSingleOrNull() - if (currencyModel === null) { - return CurrencyInfo(Currency("", symbolUpperCase), emptyList()) - } - val currencyImplModel = currencyImplementationRepository.findBySymbol(symbolUpperCase) - val currency = Currency(currencyModel.symbol, currencyModel.name) - val implementations = currencyImplModel.map { projectCurrencyImplementation(it, currencyModel) } - return CurrencyInfo(currency, implementations.toList()) - } - - override suspend fun findByChainAndTokenAddress(chain: String, address: String?): CurrencyImplementation? { - val impl = currencyImplementationRepository.findByChainAndTokenAddress(chain, address) - .awaitFirstOrNull() - - return if (impl != null) - projectCurrencyImplementation(impl) - else - null - } - - override suspend fun findImplementationsWithTokenOnChain(chain: String): List { - return currencyImplementationRepository.findByChain(chain).map { projectCurrencyImplementation(it) }.toList() - } - - private suspend fun projectCurrencyImplementation( - currencyImplementationModel: CurrencyImplementationModel, - currencyModel: CurrencyModel? = null - ): CurrencyImplementation { - val addressTypesModel = chainRepository.findAddressTypesByName(currencyImplementationModel.chain) - val addressTypes = addressTypesModel.map { AddressType(it.id!!, it.type, it.addressRegex, it.memoRegex) } - val endpointsModel = chainRepository.findEndpointsByName(currencyImplementationModel.chain) - val endpoints = endpointsModel.map { Endpoint(it.url) } - val currencyModelVal = - currencyModel ?: currencyRepository.findBySymbol(currencyImplementationModel.symbol).awaitSingle() - val currency = Currency(currencyModelVal.symbol, currencyModelVal.name) - return CurrencyImplementation( - currency, - Chain(currencyImplementationModel.chain, addressTypes.toList(), endpoints.toList()), - currencyImplementationModel.token, - currencyImplementationModel.tokenAddress, - currencyImplementationModel.tokenName, - currencyImplementationModel.withdrawEnabled, - currencyImplementationModel.withdrawFee, - currencyImplementationModel.withdrawMin, - currencyImplementationModel.decimal - ) - } -} diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncModel.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncModel.kt index 92410fa9f..41e311a0f 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncModel.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/ChainSyncModel.kt @@ -12,9 +12,9 @@ data class ChainSyncScheduleModel( val chain: String, @Column("retry_time") val retryTime: LocalDateTime, - val delay: Long, + var delay: Long, @Column("error_delay") - val errorDelay: Long + var errorDelay: Long ) @Table("chain_sync_records") diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/CurrencyImplementationModel.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/CurrencyImplementationModel.kt index 2d1b17576..01e6e4e1a 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/CurrencyImplementationModel.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/CurrencyImplementationModel.kt @@ -8,14 +8,14 @@ import java.math.BigDecimal @Table("currency_implementations") class CurrencyImplementationModel( - @Id val id: Long?, + @Id var id: Long?, @Column("symbol") val symbol: String, @Column("chain") val chain: String, @Column("token") val token: Boolean, - @Column("token_address") val tokenAddress: String?, - @Column("token_name") val tokenName: String?, - @Column("withdraw_enabled") val withdrawEnabled: Boolean, - @Column("withdraw_fee") val withdrawFee: BigDecimal, - @Column("withdraw_min") val withdrawMin: BigDecimal, - @Column("decimal") val decimal: Int + @Column("token_address") var tokenAddress: String?, + @Column("token_name") var tokenName: String?, + @Column("withdraw_enabled") var withdrawEnabled: Boolean, + @Column("withdraw_fee") var withdrawFee: BigDecimal, + @Column("withdraw_min") var withdrawMin: BigDecimal, + @Column("decimal") var decimal: Int ) diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/CurrencyModel.kt b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/CurrencyModel.kt index 1ba9b1523..92840f473 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/CurrencyModel.kt +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/kotlin/co/nilin/opex/bcgateway/ports/postgres/model/CurrencyModel.kt @@ -8,5 +8,5 @@ import org.springframework.data.relational.core.mapping.Table @Table("currency") class CurrencyModel( @Id @Column("symbol") val symbol: String, - @Column("name") val name: String + @Column("name") var name: String ) diff --git a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/schema.sql b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/schema.sql index ce91edf34..8eca3d0e5 100644 --- a/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/schema.sql +++ b/bc-gateway/bc-gateway-ports/bc-gateway-persister-postgres/src/main/resources/schema.sql @@ -50,7 +50,8 @@ CREATE TABLE IF NOT EXISTS chain_endpoints chain_name VARCHAR(72) NOT NULL REFERENCES chains (name), endpoint_url VARCHAR(255) NOT NULL, endpoint_user VARCHAR(72), - endpoint_password VARCHAR(72) + endpoint_password VARCHAR(72), + UNIQUE (chain_name, endpoint_url) ); CREATE TABLE IF NOT EXISTS chain_sync_schedules diff --git a/bc-gateway/pom.xml b/bc-gateway/pom.xml index 34cf00bb6..1df44f1cd 100644 --- a/bc-gateway/pom.xml +++ b/bc-gateway/pom.xml @@ -20,6 +20,7 @@ bc-gateway-ports/bc-gateway-persister-postgres bc-gateway-ports/bc-gateway-chain-proxy bc-gateway-ports/bc-gateway-wallet-proxy + bc-gateway-ports/bc-gateway-eventlistener-kafka @@ -51,6 +52,11 @@ bc-gateway-chain-proxy ${project.version} + + co.nilin.opex.bcgateway.ports.kafka.listener + bc-gateway-eventlistener-kafka + ${project.version} + co.nilin.opex.utility.error error-handler diff --git a/docker-compose.yml b/docker-compose.yml index 5cc2fd627..71c079e68 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -417,10 +417,15 @@ services: - JAVA_OPTS=-Xmx256m -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 - SPRING_PROFILES_DEFAULT=scheduled - CONSUL_HOST=consul + - KAFKA_IP_PORT=kafka-1:29092,kafka-2:29092,kafka-3:29092 - DB_IP_PORT=postgres-bc-gateway - BACKEND_USER=${BACKEND_USER} - VAULT_HOST=vault depends_on: + - zookeeper + - kafka-1 + - kafka-2 + - kafka-3 - redis - consul - vault @@ -454,6 +459,7 @@ services: - KAFKA_IP_PORT=kafka-1:29092,kafka-2:29092,kafka-3:29092 - CONSUL_HOST=consul - VAULT_HOST=vault + - BACKEND_USER=${BACKEND_USER} volumes: - $DATA/admin-data:/admin depends_on: diff --git a/storage/storage-app/src/main/kotlin/co/nilin/opex/storage/app/config/SecurityConfig.kt b/storage/storage-app/src/main/kotlin/co/nilin/opex/storage/app/config/SecurityConfig.kt index 0b1e5b33e..c5b4017c9 100644 --- a/storage/storage-app/src/main/kotlin/co/nilin/opex/storage/app/config/SecurityConfig.kt +++ b/storage/storage-app/src/main/kotlin/co/nilin/opex/storage/app/config/SecurityConfig.kt @@ -1,6 +1,6 @@ package co.nilin.opex.storage.app.config -import co.nilin.opex.storage.app.utils.hasRealmRole +import co.nilin.opex.storage.app.utils.hasRole import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity @@ -24,7 +24,7 @@ class SecurityConfig(private val webClient: WebClient) { .pathMatchers("/actuator/**").permitAll() .pathMatchers("/swagger-ui/**").permitAll() .pathMatchers("/swagger-resources/**").permitAll() - .pathMatchers("/admin/**").hasRealmRole("SCOPE_trust", "finance-admin") + .pathMatchers("/admin/**").hasRole("SCOPE_trust", "finance-admin") .pathMatchers("/**").hasAuthority("SCOPE_trust") .anyExchange().authenticated() .and() diff --git a/storage/storage-app/src/main/kotlin/co/nilin/opex/storage/app/utils/Extensions.kt b/storage/storage-app/src/main/kotlin/co/nilin/opex/storage/app/utils/Extensions.kt index 7f2866eaf..b7aa569e5 100644 --- a/storage/storage-app/src/main/kotlin/co/nilin/opex/storage/app/utils/Extensions.kt +++ b/storage/storage-app/src/main/kotlin/co/nilin/opex/storage/app/utils/Extensions.kt @@ -1,19 +1,17 @@ package co.nilin.opex.storage.app.utils import com.nimbusds.jose.shaded.json.JSONArray -import com.nimbusds.jose.shaded.json.JSONObject import org.springframework.security.authorization.AuthorizationDecision import org.springframework.security.config.web.server.ServerHttpSecurity import org.springframework.security.oauth2.jwt.Jwt -fun ServerHttpSecurity.AuthorizeExchangeSpec.Access.hasRealmRole( +fun ServerHttpSecurity.AuthorizeExchangeSpec.Access.hasRole( authority: String, role: String ): ServerHttpSecurity.AuthorizeExchangeSpec = access { mono, _ -> mono.map { auth -> - auth.authorities.any { it.authority == authority } - && (((auth.principal as Jwt).claims["realm_access"] as JSONObject)["roles"] as JSONArray).contains(role) - }.map { granted -> - AuthorizationDecision(granted) + val hasAuthority = auth.authorities.any { it.authority == authority } + val hasRole = ((auth.principal as Jwt).claims["roles"] as JSONArray?)?.contains(role) == true + AuthorizationDecision(hasAuthority && hasRole) } } \ No newline at end of file diff --git a/user-management/keycloak-gateway/src/main/resources/opex-realm.json b/user-management/keycloak-gateway/src/main/resources/opex-realm.json index d5913497e..0d480eb6d 100644 --- a/user-management/keycloak-gateway/src/main/resources/opex-realm.json +++ b/user-management/keycloak-gateway/src/main/resources/opex-realm.json @@ -42,10 +42,23 @@ "failureFactor": 30, "roles": { "realm": [ + { + "id": "fa6b43a1-c4eb-41d9-8f6f-b7fbc3ce6579", + "name": "system-admin", + "composite": false, + "clientRole": false, + "containerId": "opex", + "attributes": {} + }, { "id": "fe152bae-77c5-485c-be97-a5f490b3b837", "name": "finance-admin", - "composite": false, + "composite": true, + "composites": { + "realm": [ + "impersonation" + ] + }, "clientRole": false, "containerId": "opex", "attributes": {} @@ -406,6 +419,15 @@ } }, "groups": [ + { + "id": "700a0042-6146-42fc-a97a-f0f63f913301", + "name": "admins", + "path": "/admins", + "attributes": {}, + "realmRoles": [], + "clientRoles": {}, + "subGroups": [] + }, { "id": "efb76f91-62a0-409d-afea-c76a9766c6f9", "name": "finance-admin", @@ -599,17 +621,28 @@ { "client": "admin-cli", "roles": [ + "system-admin", "finance-admin" ] }, { "client": "opex-admin", "roles": [ + "system-admin", + "impersonation", + "finance-admin", "offline_access", "uma_authorization", "user" ] }, + { + "client": "web-app", + "roles": [ + "system-admin", + "finance-admin" + ] + }, { "clientScope": "offline_access", "roles": [ @@ -1459,6 +1492,37 @@ "name": "manage" } ] + }, + { + "name": "client.resource.fb5f91c4-42fa-4769-b45d-febef22b4976", + "type": "Client", + "ownerManagedAccess": false, + "attributes": {}, + "_id": "c1a93d24-8a80-4652-aeb4-008faa57d9da", + "uris": [], + "scopes": [ + { + "name": "view" + }, + { + "name": "map-roles-client-scope" + }, + { + "name": "configure" + }, + { + "name": "map-roles" + }, + { + "name": "manage" + }, + { + "name": "token-exchange" + }, + { + "name": "map-roles-composite" + } + ] } ], "policies": [ @@ -1469,7 +1533,7 @@ "logic": "POSITIVE", "decisionStrategy": "UNANIMOUS", "config": { - "clients": "[\"account-console\"]" + "clients": "[\"opex-admin\",\"account-console\"]" } }, { @@ -1692,6 +1756,83 @@ "resources": "[\"Users\"]", "scopes": "[\"user-impersonated\"]" } + }, + { + "id": "93e57b9b-777a-45bd-a739-0aa9458cf54e", + "name": "view.permission.client.fb5f91c4-42fa-4769-b45d-febef22b4976", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"client.resource.fb5f91c4-42fa-4769-b45d-febef22b4976\"]", + "scopes": "[\"view\"]" + } + }, + { + "id": "cc39f7bb-34e0-4941-bb24-5bdf9b893e41", + "name": "manage.permission.client.fb5f91c4-42fa-4769-b45d-febef22b4976", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"client.resource.fb5f91c4-42fa-4769-b45d-febef22b4976\"]", + "scopes": "[\"manage\"]" + } + }, + { + "id": "a6d0689a-95e9-4f4a-b6d4-4f79dd9a5867", + "name": "configure.permission.client.fb5f91c4-42fa-4769-b45d-febef22b4976", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"client.resource.fb5f91c4-42fa-4769-b45d-febef22b4976\"]", + "scopes": "[\"configure\"]" + } + }, + { + "id": "9230d8ee-0496-42e4-a46c-f01570df8ca1", + "name": "map-roles.permission.client.fb5f91c4-42fa-4769-b45d-febef22b4976", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"client.resource.fb5f91c4-42fa-4769-b45d-febef22b4976\"]", + "scopes": "[\"map-roles\"]" + } + }, + { + "id": "63fce897-c89b-456b-b91d-4d48981a0210", + "name": "map-roles-client-scope.permission.client.fb5f91c4-42fa-4769-b45d-febef22b4976", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"client.resource.fb5f91c4-42fa-4769-b45d-febef22b4976\"]", + "scopes": "[\"map-roles-client-scope\"]" + } + }, + { + "id": "8384fef0-7ac4-4e83-9029-3e48fb42144e", + "name": "map-roles-composite.permission.client.fb5f91c4-42fa-4769-b45d-febef22b4976", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"client.resource.fb5f91c4-42fa-4769-b45d-febef22b4976\"]", + "scopes": "[\"map-roles-composite\"]" + } + }, + { + "id": "c8a6c409-c358-41d5-aa70-30282b7b8872", + "name": "token-exchange.permission.client.fb5f91c4-42fa-4769-b45d-febef22b4976", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"client.resource.fb5f91c4-42fa-4769-b45d-febef22b4976\"]", + "scopes": "[\"token-exchange\"]" + } } ], "scopes": [ @@ -2507,7 +2648,23 @@ "attributes": { "include.in.token.scope": "true", "display.on.consent.screen": "true" - } + }, + "protocolMappers": [ + { + "id": "2bafcd16-ff19-4f72-adb4-c1735793842d", + "name": "User roles", + "protocol": "openid-connect", + "protocolMapper": "oidc-usermodel-realm-role-mapper", + "consentRequired": false, + "config": { + "id.token.claim": "true", + "access.token.claim": "true", + "claim.name": "roles", + "multivalued": "true", + "userinfo.token.claim": "true" + } + } + ] } ], "defaultDefaultClientScopes": [ diff --git a/utility/error-handler/src/main/kotlin/co/nilin/opex/utility/error/data/OpexError.kt b/utility/error-handler/src/main/kotlin/co/nilin/opex/utility/error/data/OpexError.kt index abcfdc46f..a7f76d4df 100644 --- a/utility/error-handler/src/main/kotlin/co/nilin/opex/utility/error/data/OpexError.kt +++ b/utility/error-handler/src/main/kotlin/co/nilin/opex/utility/error/data/OpexError.kt @@ -12,6 +12,7 @@ enum class OpexError(val code: Int, val message: String?, val status: HttpStatus Forbidden(1004, "Forbidden", HttpStatus.FORBIDDEN), NotFound(1005, "Not found", HttpStatus.NOT_FOUND), InvalidRequestParam(1020, "Parameter '%s' is either missing or invalid", HttpStatus.BAD_REQUEST), + InvalidRequestBody(1021, "Request body is invalid", HttpStatus.BAD_REQUEST), // code 2000: accountant InvalidPair(2001, "%s is not available", HttpStatus.BAD_REQUEST), @@ -41,7 +42,12 @@ enum class OpexError(val code: Int, val message: String?, val status: HttpStatus InvalidInterval(7007, "Invalid interval", HttpStatus.BAD_REQUEST), // code 8000: bc-gateway - ReservedAddressNotAvailable(8001, "No reserved address available", HttpStatus.BAD_REQUEST); + ReservedAddressNotAvailable(8001, "No reserved address available", HttpStatus.BAD_REQUEST), + DuplicateToken(8002, "Asset already exists", HttpStatus.BAD_REQUEST), + ChainNotFound(8003, "Chain not found", HttpStatus.NOT_FOUND), + CurrencyNotFoundBC(8004, "Currency not found", HttpStatus.NOT_FOUND), + TokenNotFound(8005, "Coin/Token not found", HttpStatus.NOT_FOUND), + InvalidAddressType(8006, "Address type is invalid", HttpStatus.NOT_FOUND); companion object { fun findByCode(code: Int?): OpexError? { diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt index 291c58695..6d7003be9 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/AppConfig.kt @@ -1,49 +1,26 @@ package co.nilin.opex.wallet.app.config -import co.nilin.opex.wallet.app.service.UserRegistrationService +import co.nilin.opex.wallet.ports.kafka.listener.consumer.AdminEventKafkaListener import co.nilin.opex.wallet.ports.kafka.listener.consumer.UserCreatedKafkaListener -import co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent +import co.nilin.opex.wallet.ports.kafka.listener.spi.AdminEventListener import co.nilin.opex.wallet.ports.kafka.listener.spi.UserCreatedEventListener -import kotlinx.coroutines.runBlocking import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Configuration -import org.springframework.stereotype.Component import java.math.BigDecimal @Configuration class AppConfig { - @Value("\${app.gift.symbol}") - val symbol: String? = null - - @Value("\${app.gift.amount}") - val amount: BigDecimal? = null - - @Autowired - fun configureUserCreatedEventListener( + fun configureEventListeners( useCreatedKafkaListener: UserCreatedKafkaListener, - userCreatedEventListener: UserCreatedEventListener + userCreatedEventListener: UserCreatedEventListener, + adminKafkaEventListener: AdminEventKafkaListener, + adminEventListener: AdminEventListener, ) { useCreatedKafkaListener.addEventListener(userCreatedEventListener) + adminKafkaEventListener.addEventListener(adminEventListener) } - @Component - class WalletUserCreatedEventListener( - val userRegistrationService: UserRegistrationService - ) : UserCreatedEventListener { - - override fun id(): String { - return "UserCreatedEventListener" - } - - override fun onEvent(event: UserCreatedEvent, partition: Int, offset: Long, timestamp: Long) { - println("UserCreatedEvent " + event) - runBlocking(AppDispatchers.kafkaExecutor) { - userRegistrationService.registerNewUser(event) - } - println("onUserCreatedEvent") - } - } } \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt index d5f49240a..863b7f2f8 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt @@ -1,13 +1,10 @@ package co.nilin.opex.wallet.app.config -import co.nilin.opex.wallet.app.utils.hasRealmRole -import net.minidev.json.JSONArray +import co.nilin.opex.wallet.app.utils.hasRole import org.springframework.beans.factory.annotation.Value import org.springframework.context.annotation.Bean -import org.springframework.security.authorization.AuthorizationDecision import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity import org.springframework.security.config.web.server.ServerHttpSecurity -import org.springframework.security.oauth2.jwt.Jwt import org.springframework.security.oauth2.jwt.NimbusReactiveJwtDecoder import org.springframework.security.oauth2.jwt.ReactiveJwtDecoder import org.springframework.security.web.server.SecurityWebFilterChain @@ -29,7 +26,7 @@ class SecurityConfig(private val webClient: WebClient) { .pathMatchers("/withdraw").hasAuthority("SCOPE_trust") .pathMatchers("/withdraw/**").hasAuthority("SCOPE_trust") .pathMatchers("/transaction/**").hasAuthority("SCOPE_trust") - .pathMatchers("/admin/**").hasRealmRole("SCOPE_trust","finance-admin") + .pathMatchers("/admin/**").hasRole("SCOPE_trust","finance-admin") .pathMatchers("/payment/internal/**").permitAll() .pathMatchers("/**").permitAll() .anyExchange().authenticated() diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/AdminController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/AdminController.kt new file mode 100644 index 000000000..e9f563b50 --- /dev/null +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/AdminController.kt @@ -0,0 +1,88 @@ +package co.nilin.opex.wallet.app.controller + +import co.nilin.opex.wallet.core.inout.WithdrawAcceptCommand +import co.nilin.opex.wallet.core.inout.WithdrawRejectCommand +import co.nilin.opex.wallet.core.inout.WithdrawResponse +import co.nilin.opex.wallet.core.inout.WithdrawResult +import co.nilin.opex.wallet.core.service.WithdrawService +import io.swagger.annotations.ApiResponse +import io.swagger.annotations.Example +import io.swagger.annotations.ExampleProperty +import org.springframework.web.bind.annotation.* +import java.math.BigDecimal + +@RestController +@RequestMapping("/admin") +class AdminController(private val withdrawService: WithdrawService) { + + @GetMapping("/withdraw") + @ApiResponse( + message = "OK", + code = 200, + examples = Example( + ExampleProperty( + value = "{ }", + mediaType = "application/json" + ) + ) + ) + suspend fun searchWithdraws( + @RequestParam("uuid", required = false) uuid: String?, + @RequestParam("withdraw_id", required = false) withdrawId: String?, + @RequestParam("currency", required = false) currency: String?, + @RequestParam("dest_transaction_ref", required = false) destTxRef: String?, + @RequestParam("dest_address", required = false) destAddress: String?, + @RequestParam("status", required = false) status: List? + ): List { + return withdrawService + .findByCriteria( + uuid, + withdrawId, + currency, + destTxRef, + destAddress, + status?.isEmpty() ?: true, + status ?: listOf("") + ) + } + + @PostMapping("/withdraw/{id}/reject") + @ApiResponse( + message = "OK", + code = 200, + examples = Example( + ExampleProperty( + value = "{ }", + mediaType = "application/json" + ) + ) + ) + suspend fun rejectWithdraw( + @PathVariable("id") withdrawId: String, + @RequestParam("statusReason") statusReason: String, + @RequestParam("destNote", required = false) destNote: String? + ): WithdrawResult { + return withdrawService.rejectWithdraw(WithdrawRejectCommand(withdrawId, statusReason, destNote)) + } + + @PostMapping("/withdraw/{id}/accept") + @ApiResponse( + message = "OK", + code = 200, + examples = Example( + ExampleProperty( + value = "{ }", + mediaType = "application/json" + ) + ) + ) + suspend fun acceptWithdraw( + @PathVariable("id") withdrawId: String, + @RequestParam("destTransactionRef", required = false) destTransactionRef: String?, + @RequestParam("destNote", required = false) destNote: String?, + @RequestParam("fee", required = false) fee: BigDecimal = BigDecimal.ZERO, + ): WithdrawResult { + return withdrawService.acceptWithdraw(WithdrawAcceptCommand(withdrawId, destTransactionRef, destNote, fee)) + } + +} \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WithdrawController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WithdrawController.kt index 527bc401c..20ad264ba 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WithdrawController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WithdrawController.kt @@ -1,6 +1,7 @@ package co.nilin.opex.wallet.app.controller import co.nilin.opex.wallet.app.dto.TransactionRequest +import co.nilin.opex.wallet.app.dto.WithdrawHistoryResponse import co.nilin.opex.wallet.core.inout.* import co.nilin.opex.wallet.core.service.WithdrawService import io.swagger.annotations.ApiResponse @@ -14,58 +15,10 @@ import java.time.LocalDateTime import java.time.ZoneId @RestController +@RequestMapping("/withdraw") class WithdrawController(private val withdrawService: WithdrawService) { - data class WithdrawHistoryResponse( - val withdrawId: Long? = null, - val uuid: String, - val amount: BigDecimal, - val acceptedFee: BigDecimal, - val appliedFee: BigDecimal?, - val destAmount: BigDecimal?, - val destCurrency: String?, - val destAddress: String?, - val destNetwork: String?, - var destNote: String?, - var destTransactionRef: String?, - val statusReason: String?, - val status: String, - val createDate: Long, - val acceptDate: Long? - ) - - @GetMapping("/admin/withdraw") - @ApiResponse( - message = "OK", - code = 200, - examples = Example( - ExampleProperty( - value = "{ }", - mediaType = "application/json" - ) - ) - ) - suspend fun searchWithdraws( - @RequestParam("uuid", required = false) uuid: String?, - @RequestParam("withdraw_id", required = false) withdrawId: String?, - @RequestParam("currency", required = false) currency: String?, - @RequestParam("dest_transaction_ref", required = false) destTxRef: String?, - @RequestParam("dest_address", required = false) destAddress: String?, - @RequestParam("status", required = false) status: List? - ): List { - return withdrawService - .findByCriteria( - uuid, - withdrawId, - currency, - destTxRef, - destAddress, - status?.isEmpty() ?: true, - status ?: listOf("") - ) - } - - @GetMapping("/withdraw") + @GetMapping @ApiResponse( message = "OK", code = 200, @@ -96,7 +49,7 @@ class WithdrawController(private val withdrawService: WithdrawService) { ) } - @PostMapping("/withdraw/{amount}_{symbol}") + @PostMapping("/{amount}_{symbol}") @ApiResponse( message = "OK", code = 200, @@ -127,48 +80,7 @@ class WithdrawController(private val withdrawService: WithdrawService) { ) } - @PostMapping( - "/admin/withdraw/{id}/reject" - ) - @ApiResponse( - message = "OK", - code = 200, - examples = Example( - ExampleProperty( - value = "{ }", - mediaType = "application/json" - ) - ) - ) - suspend fun rejectWithdraw( - @PathVariable("id") withdrawId: String, - @RequestParam("statusReason") statusReason: String, - @RequestParam("destNote", required = false) destNote: String? - ): WithdrawResult { - return withdrawService.rejectWithdraw(WithdrawRejectCommand(withdrawId, statusReason, destNote)) - } - - @PostMapping("/admin/withdraw/{id}/accept") - @ApiResponse( - message = "OK", - code = 200, - examples = Example( - ExampleProperty( - value = "{ }", - mediaType = "application/json" - ) - ) - ) - suspend fun acceptWithdraw( - @PathVariable("id") withdrawId: String, - @RequestParam("destTransactionRef", required = false) destTransactionRef: String?, - @RequestParam("destNote", required = false) destNote: String?, - @RequestParam("fee", required = false) fee: BigDecimal = BigDecimal.ZERO, - ): WithdrawResult { - return withdrawService.acceptWithdraw(WithdrawAcceptCommand(withdrawId, destTransactionRef, destNote, fee)) - } - - @PostMapping("/withdraw/history/{uuid}") + @PostMapping("/history/{uuid}") suspend fun getWithdrawTransactionsForUser( @PathVariable("uuid") uuid: String, @RequestBody request: TransactionRequest diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/dto/WithdrawHistoryResponse.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/dto/WithdrawHistoryResponse.kt new file mode 100644 index 000000000..f01016723 --- /dev/null +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/dto/WithdrawHistoryResponse.kt @@ -0,0 +1,21 @@ +package co.nilin.opex.wallet.app.dto + +import java.math.BigDecimal + +data class WithdrawHistoryResponse( + val withdrawId: Long? = null, + val uuid: String, + val amount: BigDecimal, + val acceptedFee: BigDecimal, + val appliedFee: BigDecimal?, + val destAmount: BigDecimal?, + val destCurrency: String?, + val destAddress: String?, + val destNetwork: String?, + var destNote: String?, + var destTransactionRef: String?, + val statusReason: String?, + val status: String, + val createDate: Long, + val acceptDate: Long? +) \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/AdminEventListenerImpl.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/AdminEventListenerImpl.kt new file mode 100644 index 000000000..2f3d6dde2 --- /dev/null +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/AdminEventListenerImpl.kt @@ -0,0 +1,29 @@ +package co.nilin.opex.wallet.app.listener + +import co.nilin.opex.wallet.core.spi.CurrencyService +import co.nilin.opex.wallet.ports.kafka.listener.model.AddCurrencyEvent +import co.nilin.opex.wallet.ports.kafka.listener.model.AdminEvent +import co.nilin.opex.wallet.ports.kafka.listener.model.DeleteCurrencyEvent +import co.nilin.opex.wallet.ports.kafka.listener.model.EditCurrencyEvent +import co.nilin.opex.wallet.ports.kafka.listener.spi.AdminEventListener +import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component + +@Component +class AdminEventListenerImpl(private val currencyService: CurrencyService) : AdminEventListener { + + private val logger = LoggerFactory.getLogger(AdminEventListenerImpl::class.java) + + override fun id() = "AdminEventListener" + + override fun onEvent(event: AdminEvent, partition: Int, offset: Long, timestamp: Long): Unit = runBlocking { + logger.info("Incoming admin event $event") + when (event) { + is AddCurrencyEvent -> currencyService.addCurrency(event.name, event.symbol, event.precision) + is EditCurrencyEvent -> currencyService.editCurrency(event.name, event.symbol, event.precision) + is DeleteCurrencyEvent -> currencyService.deleteCurrency(event.name) + else -> {} + } + } +} \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/WalletUserCreatedEventListener.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/WalletUserCreatedEventListener.kt new file mode 100644 index 000000000..2594a9475 --- /dev/null +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/listener/WalletUserCreatedEventListener.kt @@ -0,0 +1,26 @@ +package co.nilin.opex.wallet.app.listener + +import co.nilin.opex.wallet.app.service.UserRegistrationService +import co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent +import co.nilin.opex.wallet.ports.kafka.listener.spi.UserCreatedEventListener +import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component + +@Component +class WalletUserCreatedEventListener(val userRegistrationService: UserRegistrationService) : UserCreatedEventListener { + + private val logger = LoggerFactory.getLogger(WalletUserCreatedEventListener::class.java) + + override fun id(): String { + return "UserCreatedEventListener" + } + + override fun onEvent(event: UserCreatedEvent, partition: Int, offset: Long, timestamp: Long) { + logger.info("Incoming UserCreated event: $event") + runBlocking { + userRegistrationService.registerNewUser(event) + } + logger.info("onUserCreatedEvent") + } +} \ No newline at end of file diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/utils/Extensions.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/utils/Extensions.kt index 31218d1dd..f689fe0e6 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/utils/Extensions.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/utils/Extensions.kt @@ -1,19 +1,17 @@ package co.nilin.opex.wallet.app.utils import com.nimbusds.jose.shaded.json.JSONArray -import com.nimbusds.jose.shaded.json.JSONObject import org.springframework.security.authorization.AuthorizationDecision import org.springframework.security.config.web.server.ServerHttpSecurity import org.springframework.security.oauth2.jwt.Jwt -fun ServerHttpSecurity.AuthorizeExchangeSpec.Access.hasRealmRole( +fun ServerHttpSecurity.AuthorizeExchangeSpec.Access.hasRole( authority: String, role: String ): ServerHttpSecurity.AuthorizeExchangeSpec = access { mono, _ -> mono.map { auth -> - auth.authorities.any { it.authority == authority } - && (((auth.principal as Jwt).claims["realm_access"] as JSONObject)["roles"] as JSONArray).contains(role) - }.map { granted -> - AuthorizationDecision(granted) + val hasAuthority = auth.authorities.any { it.authority == authority } + val hasRole = ((auth.principal as Jwt).claims["roles"] as JSONArray?)?.contains(role) == true + AuthorizationDecision(hasAuthority && hasRole) } } \ No newline at end of file diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/Currency.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/Currency.kt index 98a97b9d4..490a7d16c 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/Currency.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/Currency.kt @@ -3,5 +3,5 @@ package co.nilin.opex.wallet.core.model interface Currency { fun getSymbol(): String fun getName(): String - fun getPrecision(): Int + fun getPrecision(): Double } \ No newline at end of file diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/CurrencyService.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/CurrencyService.kt index 2773cf325..c17009c82 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/CurrencyService.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/CurrencyService.kt @@ -3,5 +3,12 @@ package co.nilin.opex.wallet.core.spi import co.nilin.opex.wallet.core.model.Currency interface CurrencyService { + suspend fun getCurrency(symbol: String): Currency? + + suspend fun addCurrency(name: String, symbol: String, precision: Double) + + suspend fun editCurrency(name: String, symbol: String, precision: Double) + + suspend fun deleteCurrency(name: String) } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt index 96e5e0e46..f3cba7bfc 100644 --- a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/config/WalletKafkaConfig.kt @@ -1,6 +1,8 @@ package co.nilin.opex.wallet.ports.kafka.listener.config +import co.nilin.opex.wallet.ports.kafka.listener.consumer.AdminEventKafkaListener import co.nilin.opex.wallet.ports.kafka.listener.consumer.UserCreatedKafkaListener +import co.nilin.opex.wallet.ports.kafka.listener.model.AdminEvent import co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer.ConsumerConfig @@ -32,7 +34,7 @@ class WalletKafkaConfig { @Value("\${spring.kafka.consumer.group-id}") private val groupId: String? = null - @Bean("walletConsumerConfig") + @Bean fun consumerConfigs(): Map { return mapOf( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, @@ -40,15 +42,46 @@ class WalletKafkaConfig { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java, JsonDeserializer.TRUSTED_PACKAGES to "co.nilin.opex.*", - JsonDeserializer.TYPE_MAPPINGS to "user_created_event:co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent" + JsonDeserializer.TYPE_MAPPINGS to "user_created_event:co.nilin.opex.wallet.ports.kafka.listener.model.UserCreatedEvent,admin_add_currency:co.nilin.opex.wallet.ports.kafka.listener.model.AddCurrencyEvent,admin_edit_currency:co.nilin.opex.wallet.ports.kafka.listener.model.EditCurrencyEvent,admin_delete_currency:co.nilin.opex.wallet.ports.kafka.listener.model.DeleteCurrencyEvent" ) } @Bean("walletConsumerFactory") - fun consumerFactory(@Qualifier("walletConsumerConfig") consumerConfigs: Map): ConsumerFactory { + fun consumerFactory(consumerConfigs: Map): ConsumerFactory { return DefaultKafkaConsumerFactory(consumerConfigs) } + @Bean + fun adminEventsConsumerFactory(consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) + } + + @Autowired + @ConditionalOnBean(UserCreatedKafkaListener::class) + fun configureUserCreatedListener( + listener: UserCreatedKafkaListener, + @Qualifier("walletConsumerFactory") consumerFactory: ConsumerFactory + ) { + val containerProps = ContainerProperties(Pattern.compile("auth_user_created")) + containerProps.messageListener = listener + val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) + container.setBeanName("UserCreatedKafkaListenerContainer") + container.start() + } + + @Autowired + @ConditionalOnBean(AdminEventKafkaListener::class) + fun configureAdminEventListener( + listener: AdminEventKafkaListener, + consumerFactory: ConsumerFactory + ) { + val containerProps = ContainerProperties(Pattern.compile("admin_event")) + containerProps.messageListener = listener + val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) + container.setBeanName("AdminEventKafkaListenerContainer") + container.start() + } + @Bean("walletProducerConfig") fun producerConfigs(): Map { return mapOf( @@ -69,19 +102,6 @@ class WalletKafkaConfig { return KafkaTemplate(producerFactory) } - @Autowired - @ConditionalOnBean(UserCreatedKafkaListener::class) - fun configureUserCreatedListener( - listener: UserCreatedKafkaListener, - @Qualifier("walletConsumerFactory") consumerFactory: ConsumerFactory - ) { - val containerProps = ContainerProperties(Pattern.compile("auth_user_created")) - containerProps.messageListener = listener - val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) - container.setBeanName("UserCreatedKafkaListenerContainer") - container.start() - } - @Autowired fun createUserCreatedTopics(applicationContext: GenericApplicationContext) { applicationContext.registerBean("topic_auth_user_created", NewTopic::class.java, Supplier { diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/consumer/AdminEventKafkaListener.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/consumer/AdminEventKafkaListener.kt new file mode 100644 index 000000000..6cdd7e8f2 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/consumer/AdminEventKafkaListener.kt @@ -0,0 +1,27 @@ +package co.nilin.opex.wallet.ports.kafka.listener.consumer + +import co.nilin.opex.wallet.ports.kafka.listener.model.AdminEvent +import co.nilin.opex.wallet.ports.kafka.listener.spi.AdminEventListener +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.listener.MessageListener +import org.springframework.stereotype.Component + +@Component +class AdminEventKafkaListener : MessageListener { + + private val listeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { + listeners.forEach { + it.onEvent(data.value(), data.partition(), data.offset(), data.timestamp()) + } + } + + fun addEventListener(tl: AdminEventListener) { + listeners.add(tl) + } + + fun removeEventListener(tl: AdminEventListener) { + listeners.removeIf { it.id() == tl.id() } + } +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/AddCurrencyEvent.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/AddCurrencyEvent.kt new file mode 100644 index 000000000..8a5b2870b --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/AddCurrencyEvent.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.wallet.ports.kafka.listener.model + +data class AddCurrencyEvent( + val name: String, + val symbol: String, + val precision: Double +) : AdminEvent() \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/AdminEvent.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/AdminEvent.kt new file mode 100644 index 000000000..0944d6a6e --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/AdminEvent.kt @@ -0,0 +1,3 @@ +package co.nilin.opex.wallet.ports.kafka.listener.model + +abstract class AdminEvent \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/DeleteCurrencyEvent.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/DeleteCurrencyEvent.kt new file mode 100644 index 000000000..24ffae23b --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/DeleteCurrencyEvent.kt @@ -0,0 +1,3 @@ +package co.nilin.opex.wallet.ports.kafka.listener.model + +data class DeleteCurrencyEvent(val name: String) : AdminEvent() \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/EditCurrencyEvent.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/EditCurrencyEvent.kt new file mode 100644 index 000000000..dba2c2ba6 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/model/EditCurrencyEvent.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.wallet.ports.kafka.listener.model + +data class EditCurrencyEvent( + val name: String, + val symbol: String, + val precision: Double +) : AdminEvent() \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/spi/AdminEventListener.kt b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/spi/AdminEventListener.kt new file mode 100644 index 000000000..3c8340c12 --- /dev/null +++ b/wallet/wallet-ports/wallet-eventlistener-kafka/src/main/kotlin/co/nilin/opex/wallet/ports/kafka/listener/spi/AdminEventListener.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.wallet.ports.kafka.listener.spi + +import co.nilin.opex.wallet.ports.kafka.listener.model.AdminEvent + +interface AdminEventListener { + + fun id(): String + + fun onEvent(event: AdminEvent, partition: Int, offset: Long, timestamp: Long) + +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/CurrencyRepository.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/CurrencyRepository.kt index 7b46de0f1..41e94d7f9 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/CurrencyRepository.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/CurrencyRepository.kt @@ -1,10 +1,18 @@ package co.nilin.opex.wallet.ports.postgres.dao import co.nilin.opex.wallet.ports.postgres.model.CurrencyModel +import org.springframework.data.r2dbc.repository.Query import org.springframework.data.repository.reactive.ReactiveCrudRepository import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono @Repository interface CurrencyRepository : ReactiveCrudRepository { + @Query("insert into currency values (:name, :symbol, :precision) on conflict do nothing") + fun insert(name: String, symbol: String, precision: Double): Mono + + @Query("delete from currency where name = :name") + fun deleteByName(name: String): Mono + } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/CurrencyServiceImpl.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/CurrencyServiceImpl.kt index 986acb38a..75ec8b59f 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/CurrencyServiceImpl.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/CurrencyServiceImpl.kt @@ -3,13 +3,43 @@ package co.nilin.opex.wallet.ports.postgres.impl import co.nilin.opex.wallet.core.model.Currency import co.nilin.opex.wallet.core.spi.CurrencyService import co.nilin.opex.wallet.ports.postgres.dao.CurrencyRepository +import kotlinx.coroutines.reactive.awaitFirst import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingleOrNull +import org.slf4j.LoggerFactory import org.springframework.stereotype.Service @Service class CurrencyServiceImpl(val currencyRepository: CurrencyRepository) : CurrencyService { + private val logger = LoggerFactory.getLogger(CurrencyServiceImpl::class.java) + override suspend fun getCurrency(symbol: String): Currency? { return currencyRepository.findById(symbol).awaitFirstOrNull() } + + override suspend fun addCurrency(name: String, symbol: String, precision: Double) { + try { + currencyRepository.insert(name, symbol, precision).awaitSingleOrNull() + } catch (e: Exception) { + logger.error("Could not insert new currency $name", e) + } + } + + override suspend fun editCurrency(name: String, symbol: String, precision: Double) { + val currency = currencyRepository.findById(name).awaitFirstOrNull() + if (currency != null) { + currency.symbol_ = symbol + currency.precision_ = precision + currencyRepository.save(currency).awaitFirst() + } + } + + override suspend fun deleteCurrency(name: String) { + try { + currencyRepository.deleteByName(name).awaitFirstOrNull() + } catch (e: Exception) { + logger.error("Could not delete currency $name", e) + } + } } \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/CurrencyModel.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/CurrencyModel.kt index 9fa8a460d..cf4f253b0 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/CurrencyModel.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/CurrencyModel.kt @@ -10,9 +10,10 @@ import org.springframework.data.relational.core.mapping.Table @Table("currency") data class CurrencyModel( @JsonIgnore @Id @Column("name") val name_: String, - @JsonIgnore @Column("symbol") val symbol_: String, - @JsonIgnore @Column("precision") val precision_: Int + @JsonIgnore @Column("symbol") var symbol_: String, + @JsonIgnore @Column("precision") var precision_: Double ) : Currency { + override fun getSymbol(): String { return symbol_ } @@ -21,7 +22,7 @@ data class CurrencyModel( return name_ } - override fun getPrecision(): Int { + override fun getPrecision(): Double { return precision_ } }