diff --git a/Api/api-app/pom.xml b/Api/api-app/pom.xml index f430eb9c5..620cef3b9 100644 --- a/Api/api-app/pom.xml +++ b/Api/api-app/pom.xml @@ -208,6 +208,6 @@ - opex-accountant + opex-api diff --git a/Api/api-app/src/main/kotlin/co/nilin/opex/app/ApiApp.kt b/Api/api-app/src/main/kotlin/co/nilin/opex/app/ApiApp.kt index 338e2bab1..841b43d18 100644 --- a/Api/api-app/src/main/kotlin/co/nilin/opex/app/ApiApp.kt +++ b/Api/api-app/src/main/kotlin/co/nilin/opex/app/ApiApp.kt @@ -1,23 +1,9 @@ package co.nilin.opex.app -import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication -import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan -import org.springframework.security.core.annotation.AuthenticationPrincipal -import springfox.documentation.builders.* -import springfox.documentation.builders.PathSelectors.regex -import springfox.documentation.service.* -import springfox.documentation.spi.DocumentationType -import springfox.documentation.spi.service.contexts.SecurityContext -import springfox.documentation.spring.web.plugins.Docket -import springfox.documentation.swagger.web.SecurityConfiguration -import springfox.documentation.swagger.web.SecurityConfigurationBuilder import springfox.documentation.swagger2.annotations.EnableSwagger2 -import java.security.Principal -import java.util.Collections.singletonList - @SpringBootApplication @ComponentScan("co.nilin.opex") diff --git a/Api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/port/api/binance/config/SecurityConfig.kt b/Api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/port/api/binance/config/SecurityConfig.kt index 12b98b829..047c58dc5 100644 --- a/Api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/port/api/binance/config/SecurityConfig.kt +++ b/Api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/port/api/binance/config/SecurityConfig.kt @@ -30,6 +30,7 @@ class SecurityConfig(private val webClient: WebClient) { .pathMatchers("/v3/ticker/**").permitAll() .pathMatchers("/v3/exchangeInfo").permitAll() .pathMatchers("/v3/klines").permitAll() + .pathMatchers("/socket").permitAll() .pathMatchers("/**").hasAuthority("SCOPE_trust") .anyExchange().authenticated() .and() diff --git a/Deployment/docker-compose.yml b/Deployment/docker-compose.yml index 7f511f6a2..332a62ef0 100644 --- a/Deployment/docker-compose.yml +++ b/Deployment/docker-compose.yml @@ -306,21 +306,30 @@ services: deploy: restart_policy: condition: on-failure - nginx: - image: jboesl/docker-nginx-headers-more - container_name: opex_nginx - volumes: - - ./nginx.conf:/etc/nginx/nginx.conf - - $DATA/www:/data/www + websocket: + container_name: websocket + build: + context: ../Websocket/websocket-app + dockerfile: Dockerfile ports: - - 80:80 - depends_on: - - wallet - - auth - - matching-gateway - - api + - 127.0.0.1:8097:8097 + - 127.0.0.1:1054:1044 + environment: + - JAVA_OPTS=-Xmx256m -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=1044 + - SPRING_PROFILES_DEFAULT=docker + - KAFKA_IP_PORT=kafka:9092 + - CONSUL_HOST=consul + - DB_IP_PORT=postgres-api networks: - opex + depends_on: + - zookeeper + - kafka + - consul + - postgres-api + deploy: + restart_policy: + condition: on-failure bc-gateway: container_name: bc-gateway build: @@ -366,6 +375,21 @@ services: deploy: restart_policy: condition: on-failure + nginx: + image: jboesl/docker-nginx-headers-more + container_name: opex_nginx + volumes: + - ./nginx.conf:/etc/nginx/nginx.conf + - $DATA/www:/data/www + ports: + - 80:80 + depends_on: + - wallet + - auth + - matching-gateway + - api + networks: + - opex networks: opex: driver: bridge diff --git a/Deployment/nginx.conf b/Deployment/nginx.conf index 22c252869..b88719230 100644 --- a/Deployment/nginx.conf +++ b/Deployment/nginx.conf @@ -27,6 +27,10 @@ http { server storage:8096; } + upstream docker-websocket { + server storage:8097; + } + proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; @@ -70,6 +74,11 @@ http { rewrite ^/storage/(.*)$ /$1 break; } + location /stream { + proxy_pass http://docker-websocket; + rewrite ^/stream/(.*)$ /$1 break; + } + location /api { proxy_pass http://docker-api; rewrite ^/api(.*)$ $1 break; diff --git a/Jenkins/Jenkinsfile.deploy.groovy b/Jenkins/Jenkinsfile.deploy.groovy index 2e336d280..29305ef4c 100644 --- a/Jenkins/Jenkinsfile.deploy.groovy +++ b/Jenkins/Jenkinsfile.deploy.groovy @@ -47,6 +47,10 @@ pipeline { dir("Storage") { sh 'mvn -B clean install' } + + dir("Websocket") { + sh 'mvn -B clean install' + } } } diff --git a/Websocket/pom.xml b/Websocket/pom.xml new file mode 100644 index 000000000..e619d6c40 --- /dev/null +++ b/Websocket/pom.xml @@ -0,0 +1,18 @@ + + + 4.0.0 + co.nilin.opex + websocket-root + 1.0-SNAPSHOT + websocket-root + pom + Websocket root module + + websocket-app + websocket-core + websocket-ports/websocket-eventlistener-kafka + websocket-ports/websocket-persister-postgres + + \ No newline at end of file diff --git a/Websocket/websocket-app/.gitignore b/Websocket/websocket-app/.gitignore new file mode 100644 index 000000000..549e00a2a --- /dev/null +++ b/Websocket/websocket-app/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/Websocket/websocket-app/Dockerfile b/Websocket/websocket-app/Dockerfile new file mode 100644 index 000000000..f2cbd4c26 --- /dev/null +++ b/Websocket/websocket-app/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:8-jdk-alpine +VOLUME /tmp +ARG JAR_FILE=target/*.jar +COPY ${JAR_FILE} app.jar +ENTRYPOINT ["sh", "-c", "java ${JAVA_OPTS} -jar /app.jar"] \ No newline at end of file diff --git a/Websocket/websocket-app/pom.xml b/Websocket/websocket-app/pom.xml new file mode 100644 index 000000000..c0e773b5e --- /dev/null +++ b/Websocket/websocket-app/pom.xml @@ -0,0 +1,230 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.4.4 + + + co.nilin.opex + websocket-app + 1.0-SNAPSHOT + websocket-app + Websocket app + + + 1.8 + 1.4.31 + 2020.0.2 + ${version} + ${version} + ${version} + ${version} + + + + + org.springframework.boot + spring-boot-starter-webflux + + + + org.springframework.boot + spring-boot-starter-oauth2-resource-server + + + + org.springframework.boot + spring-boot-starter-actuator + + + + org.springframework.cloud + spring-cloud-starter-consul-all + + + + org.springframework.boot + spring-boot-starter-websocket + + + + org.springframework.security + spring-security-messaging + + + + com.fasterxml.jackson.module + jackson-module-kotlin + + + io.projectreactor.kotlin + reactor-kotlin-extensions + + + org.jetbrains.kotlin + kotlin-reflect + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + + + org.jetbrains.kotlinx + kotlinx-coroutines-reactor + + + + co.nilin.opex + accountant-core + ${accountant.version} + + + co.nilin.opex + websocket-core + ${websocket.version} + + + co.nilin.opex + websocket-eventlistener-kafka + ${websocket.version} + + + co.nilin.opex + websocket-persister-postgres + ${websocket.version} + + + + io.projectreactor + reactor-test + test + + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + + ${project.basedir}/src/main/kotlin + ${project.basedir}/src/test/kotlin + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + 2.18 + + + ${skip.unit.tests} + + + **/*IntegrationTest.java + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-test-source + generate-test-sources + + add-test-source + + + + src/test/java + + + + + compile + + add-source + + + + src/main/java + + + + + + + org.jetbrains.kotlin + kotlin-maven-plugin + ${kotlin.version} + + + compile + compile + + compile + + + + test-compile + test-compile + + test-compile + + + + + + -Xjsr305=strict + + + spring + + 1.8 + + + + org.jetbrains.kotlin + kotlin-maven-allopen + ${kotlin.version} + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + compile + + compile + + + + testCompile + test-compile + + testCompile + + + + + + opex-websocket + + + diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/WebSocketApp.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/WebSocketApp.kt new file mode 100644 index 000000000..26150f8a6 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/WebSocketApp.kt @@ -0,0 +1,15 @@ +package co.nilin.opex.port.websocket + +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.runApplication +import org.springframework.context.annotation.ComponentScan +import org.springframework.scheduling.annotation.EnableScheduling + +@SpringBootApplication +@EnableScheduling +@ComponentScan("co.nilin.opex") +class WebSocketApp + +fun main(args: Array) { + runApplication(*args) +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/config/AppConfig.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/config/AppConfig.kt new file mode 100644 index 000000000..3effe507a --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/config/AppConfig.kt @@ -0,0 +1,29 @@ +package co.nilin.opex.port.websocket.config + +import co.nilin.opex.port.websocket.kafka.consumer.OrderKafkaListener +import co.nilin.opex.port.websocket.kafka.consumer.TradeKafkaListener +import co.nilin.opex.port.websocket.listener.WebSocketKafkaListener +import co.nilin.opex.websocket.core.spi.EventStreamHandler +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration + +@Configuration +class AppConfig { + + @Autowired + fun configureListeners( + orderKafkaListener: OrderKafkaListener, + tradeKafkaListener: TradeKafkaListener, + appListener: WebSocketKafkaListener + ) { + orderKafkaListener.addOrderListener(appListener) + tradeKafkaListener.addTradeListener(appListener) + } + + @Bean + fun websocketListener(eventStreamHandler: EventStreamHandler): WebSocketKafkaListener { + return WebSocketKafkaListener(eventStreamHandler) + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/config/AppDispatchers.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/config/AppDispatchers.kt new file mode 100644 index 000000000..a6509e0b8 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/config/AppDispatchers.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.port.websocket.config + +import kotlinx.coroutines.asCoroutineDispatcher +import java.util.concurrent.Executors + +object AppDispatchers { + + val websocketExecutor = Executors.newFixedThreadPool(32).asCoroutineDispatcher() + + val kafkaExecutor = Executors.newSingleThreadExecutor().asCoroutineDispatcher() +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/config/WebSecurityConfig.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/config/WebSecurityConfig.kt new file mode 100644 index 000000000..d1bfa356d --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/config/WebSecurityConfig.kt @@ -0,0 +1,41 @@ +package co.nilin.opex.port.websocket.config + +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.security.config.annotation.web.builders.HttpSecurity +import org.springframework.security.config.annotation.web.builders.WebSecurity +import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter +import org.springframework.security.config.http.SessionCreationPolicy +import org.springframework.security.oauth2.jwt.JwtDecoder +import org.springframework.security.oauth2.jwt.NimbusJwtDecoder +import org.springframework.web.reactive.function.client.WebClient + +@Configuration +class WebSecurityConfig : WebSecurityConfigurerAdapter() { + + @Value("\${app.auth.cert-url}") + private lateinit var jwkUrl: String + + override fun configure(web: WebSecurity) { + web.ignoring().antMatchers("/actuator/health") + } + + override fun configure(http: HttpSecurity) { + http.httpBasic().disable() + .sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS).and() + .authorizeRequests() + .antMatchers("/ws/**").permitAll() + .anyRequest().denyAll() + .and() + .oauth2ResourceServer() + .jwt() + } + + @Bean + @Throws(Exception::class) + fun jwtDecoder(): JwtDecoder { + return NimbusJwtDecoder.withJwkSetUri(jwkUrl).build() + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/controller/MarketController.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/controller/MarketController.kt new file mode 100644 index 000000000..f94000ce4 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/controller/MarketController.kt @@ -0,0 +1,48 @@ +package co.nilin.opex.port.websocket.controller + +import co.nilin.opex.port.websocket.service.MarketDestinationType +import co.nilin.opex.port.websocket.service.MarketStreamHandler +import org.springframework.messaging.handler.annotation.DestinationVariable +import org.springframework.messaging.handler.annotation.MessageMapping +import org.springframework.messaging.handler.annotation.Payload +import org.springframework.messaging.simp.annotation.SubscribeMapping +import org.springframework.stereotype.Controller + +@Controller +class MarketController(private val handler: MarketStreamHandler) { + + private val validDurations = arrayOf("24h", "7d", "1M") + + @SubscribeMapping("/market/depth/{symbol}") + fun requestOrderBook(@DestinationVariable("symbol") symbol: String) { + handler.newSubscription(MarketDestinationType.Depth(symbol)) + } + + @SubscribeMapping("/market/price") + fun requestPrice() { + handler.newSubscription(MarketDestinationType.Price) + } + + @SubscribeMapping("/market/overview/{symbol}-{duration}") + fun requestOverview( + @DestinationVariable("symbol") symbol: String, + @DestinationVariable("duration") duration: String + ) { + if (validDurations.contains(duration)) + handler.newSubscription(MarketDestinationType.Overview(symbol, duration)) + } + + @SubscribeMapping("/market/kline/{symbol}-{interval}") + fun requestCandleData( + @DestinationVariable("symbol") symbol: String, + @DestinationVariable("interval") interval: String + ) { + handler.newSubscription(MarketDestinationType.Candle(symbol, interval)) + } + + @SubscribeMapping("/market/recent-trades/{symbol}") + fun requestRecentTrades(@DestinationVariable("symbol") symbol: String) { + handler.newSubscription(MarketDestinationType.RecentTrades(symbol)) + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/DepthResponse.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/DepthResponse.kt new file mode 100644 index 000000000..f3807c9ff --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/DepthResponse.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.port.websocket.dto + +import java.math.BigDecimal + +data class DepthResponse( + val lastUpdateId: Long, + val bids: List>, // Inner list -> [0]: PRICE, [1]: QTY + val asks: List> // Inner list -> [0]: PRICE, [1]: QTY +) \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/Interval.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/Interval.kt new file mode 100644 index 000000000..d18011976 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/Interval.kt @@ -0,0 +1,43 @@ +package co.nilin.opex.port.websocket.dto + +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneId +import java.util.* +import java.util.concurrent.TimeUnit + +enum class Interval(val label: String, val unit: TimeUnit, val duration: Long) { + + Minute("1m", TimeUnit.MINUTES, 1), + ThreeMinutes("3m", TimeUnit.MINUTES, 3), + FiveMinutes("5m", TimeUnit.MINUTES, 5), + FifteenMinutes("15m", TimeUnit.MINUTES, 15), + ThirtyMinutes("30m", TimeUnit.MINUTES, 30), + Hour("1h", TimeUnit.HOURS, 1), + TwoHours("2h", TimeUnit.HOURS, 2), + FourHours("4h", TimeUnit.HOURS, 4), + SixHours("6h", TimeUnit.HOURS, 6), + EightHours("8h", TimeUnit.HOURS, 8), + TwelveHours("12h", TimeUnit.HOURS, 12), + TwentyFourHours("24h", TimeUnit.HOURS, 24), + Day("1d", TimeUnit.DAYS, 1), + ThreeDays("3d", TimeUnit.DAYS, 3), + Week("1w", TimeUnit.DAYS, 7), + Month("1M", TimeUnit.DAYS, 31), + ThreeMonth("3M",TimeUnit.DAYS, 90); + + private fun getOffsetTime() = unit.toMillis(duration) + + fun getDate() = Date(Date().time - getOffsetTime()) + + fun getLocalDateTime(): LocalDateTime = with(Instant.ofEpochMilli(getDate().time)) { + LocalDateTime.ofInstant(this, ZoneId.systemDefault()) + } + + companion object { + fun findByLabel(label: String): Interval? { + return values().find { it.label == label } + } + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/OrderBookResponse.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/OrderBookResponse.kt new file mode 100644 index 000000000..91444b51f --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/OrderBookResponse.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.port.websocket.dto + +import java.math.BigDecimal + +data class OrderBookResponse( + val lastUpdateId: Long, + val bids: List>, // Inner list -> [0]: PRICE, [1]: QTY + val asks: List> // Inner list -> [0]: PRICE, [1]: QTY +) \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/OrderResponse.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/OrderResponse.kt new file mode 100644 index 000000000..a5eda691a --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/OrderResponse.kt @@ -0,0 +1,29 @@ +package co.nilin.opex.port.websocket.dto + +import co.nilin.opex.websocket.core.inout.OrderSide +import co.nilin.opex.websocket.core.inout.OrderStatus +import co.nilin.opex.websocket.core.inout.OrderType +import co.nilin.opex.websocket.core.inout.TimeInForce +import java.math.BigDecimal +import java.util.* + +data class OrderResponse( + val symbol: String, + val orderId: Long, + val orderListId: Long, //Unless part of an OCO, the value will always be -1. + val clientOrderId: String?, + val price: BigDecimal, + val origQty: BigDecimal, + val executedQty: BigDecimal, + val cummulativeQuoteQty: BigDecimal, + val status: OrderStatus, + val timeInForce: TimeInForce, + val type: OrderType, + val side: OrderSide, + val stopPrice: BigDecimal?, + val icebergQty: BigDecimal?, + val time: Date, + val updateTime: Date, + val isWorking: Boolean, + val origQuoteOrderQty: BigDecimal +) diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/RecentTradeResponse.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/RecentTradeResponse.kt new file mode 100644 index 000000000..310512ab5 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/RecentTradeResponse.kt @@ -0,0 +1,15 @@ +package co.nilin.opex.port.websocket.dto + +import java.math.BigDecimal +import java.util.* + +data class RecentTradeResponse( + val symbol: String, + val id: Long, + val price: BigDecimal, + val qty: BigDecimal, + val quoteQty: BigDecimal, + val time: Date, + val isBestMatch: Boolean, + val isMakerBuyer: Boolean +) diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/TradeResponse.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/TradeResponse.kt new file mode 100644 index 000000000..0b840aec4 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/dto/TradeResponse.kt @@ -0,0 +1,21 @@ +package co.nilin.opex.port.websocket.dto + +import java.math.BigDecimal +import java.util.* + +data class TradeResponse( + val symbol: String, + val id: Long, + val orderId: Long, + val orderListId: Long = -1, + val price: BigDecimal, + val qty: BigDecimal, + val quoteQty: BigDecimal, + val commission: BigDecimal, + val commissionAsset: String, + val time: Date, + val isBuyer: Boolean, + val isMaker: Boolean, + val isBestMatch: Boolean, + val isMakerBuyer: Boolean +) \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/listener/WebSocketKafkaListener.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/listener/WebSocketKafkaListener.kt new file mode 100644 index 000000000..59b50d7b8 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/listener/WebSocketKafkaListener.kt @@ -0,0 +1,34 @@ +package co.nilin.opex.port.websocket.listener + +import co.nilin.opex.accountant.core.inout.RichOrder +import co.nilin.opex.accountant.core.inout.RichTrade +import co.nilin.opex.port.websocket.config.AppDispatchers +import co.nilin.opex.port.websocket.kafka.spi.RichOrderListener +import co.nilin.opex.port.websocket.kafka.spi.RichTradeListener +import co.nilin.opex.websocket.core.spi.EventStreamHandler +import kotlinx.coroutines.runBlocking + +class WebSocketKafkaListener(private val handler: EventStreamHandler) : RichTradeListener, RichOrderListener { + + override fun id(): String { + return "WebSocketKafkaListener" + } + + override fun onTrade( + trade: RichTrade, + partition: Int, + offset: Long, + timestamp: Long + ) { + handler.handleTrade(trade) + } + + override fun onOrder( + order: RichOrder, + partition: Int, + offset: Long, + timestamp: Long + ) { + handler.handleOrder(order) + } +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/EventStreamHandlerImpl.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/EventStreamHandlerImpl.kt new file mode 100644 index 000000000..918e5eeab --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/EventStreamHandlerImpl.kt @@ -0,0 +1,121 @@ +package co.nilin.opex.port.websocket.service + +import co.nilin.opex.accountant.core.inout.RichOrder +import co.nilin.opex.accountant.core.inout.RichTrade +import co.nilin.opex.matching.core.model.OrderDirection +import co.nilin.opex.port.websocket.config.AppDispatchers +import co.nilin.opex.port.websocket.dto.OrderResponse +import co.nilin.opex.port.websocket.postgres.dao.OrderRepository +import co.nilin.opex.port.websocket.postgres.model.OrderModel +import co.nilin.opex.port.websocket.utils.* +import co.nilin.opex.websocket.core.inout.OrderStatus +import co.nilin.opex.websocket.core.inout.TradeResponse +import co.nilin.opex.websocket.core.spi.EventStreamHandler +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.springframework.messaging.simp.SimpMessagingTemplate +import org.springframework.messaging.simp.user.SimpUserRegistry +import org.springframework.stereotype.Component +import java.math.BigDecimal +import java.time.ZoneId +import java.util.* + +@Component +class EventStreamHandlerImpl( + private val template: SimpMessagingTemplate, + private val orderRepository: OrderRepository, + private val registry: SimpUserRegistry +) : EventStreamHandler { + + override fun handleOrder(order: RichOrder) { + val response = OrderResponse( + order.pair, + order.orderId ?: -1, + -1, + null, + order.price, + order.quantity, + order.executedQuantity, + order.accumulativeQuoteQty, + order.status.toOrderStatus(), + order.constraint.toTimeInForce(), + order.type.toWebsocketOrderType(), + order.direction.toOrderSide(), + null, + null, + Date(), + Date(), + order.status.toOrderStatus().isWorking(), + order.quoteQuantity + ) + run { template.convertAndSendToUser(order.uuid, EventDestinations.Order.path, response) } + } + + override fun handleTrade(trade: RichTrade) { + run { + val takerOrder = orderRepository.findByOuid(trade.takerOuid).awaitFirstOrNull() + val makerOrder = orderRepository.findByOuid(trade.makerOuid).awaitFirstOrNull() + if (makerOrder==null ||takerOrder==null) + return@run + + val maker = trade.buildTradeResponse(trade.makerUuid, makerOrder, takerOrder) + val taker = trade.buildTradeResponse(trade.takerUuid, makerOrder, takerOrder) + template.convertAndSendToUser(trade.makerUuid, EventDestinations.Trade.path, maker) + template.convertAndSendToUser(trade.takerUuid, EventDestinations.Trade.path, taker) + } + } + + private fun RichTrade.buildTradeResponse( + uuid: String, + makerOrder: OrderModel, + takerOrder: OrderModel + ): TradeResponse { + val isMakerBuyer = makerOrder.direction == OrderDirection.BID + return TradeResponse( + pair, + id, + if (takerUuid == uuid) takerOrder.orderId!! else makerOrder.orderId!!, + -1, + if (takerUuid == uuid) takerPrice else makerPrice, + matchedQuantity, + if (isMakerBuyer) + makerOrder.quoteQuantity?.toBigDecimal() ?: BigDecimal.ZERO + else + takerOrder.quoteQuantity?.toBigDecimal() ?: BigDecimal.ZERO, + if (takerUuid == uuid) takerCommision else makerCommision, + if (takerUuid == uuid) takerCommisionAsset else makerCommisionAsset, + Date(), + if (takerUuid == uuid) + OrderDirection.ASK == takerOrder.direction + else + OrderDirection.ASK == makerOrder.direction, + makerUuid == uuid, + true, + isMakerBuyer + ) + } + + private fun run(action: suspend () -> Unit) { + runBlocking(AppDispatchers.websocketExecutor) { + try { + action() + } catch (e: Exception) { + e.printStackTrace() + } + } + } + + enum class EventDestinations(val path: String) { + Order("/secured/queue/orders"), + Trade("/secured/queue/trades"); + + companion object { + fun findByPath(path: String): EventDestinations? { + return values().find { it.path == path } + } + } + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/MarketDestinationType.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/MarketDestinationType.kt new file mode 100644 index 000000000..9e6d5fcb8 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/MarketDestinationType.kt @@ -0,0 +1,18 @@ +package co.nilin.opex.port.websocket.service + +sealed class MarketDestinationType(val base: String, val path: String) { + + data class Depth(val symbol: String) : + MarketDestinationType("/market/depth", "/topic/market/depth/$symbol") + + object Price : MarketDestinationType("/market/price", "/topic/market/price") + + data class Overview(val symbol: String, val duration: String) : + MarketDestinationType("/market/overview", "/topic/market/overview/$symbol-$duration") + + data class Candle(val symbol: String, val interval: String) : + MarketDestinationType("/market/kline", "/topic/market/kline/$symbol-$interval") + + data class RecentTrades(val symbol: String) : + MarketDestinationType("/market/recent-trades", "/topic/market/recent-trades/$symbol") +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/MarketService.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/MarketService.kt new file mode 100644 index 000000000..8eab79bfd --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/MarketService.kt @@ -0,0 +1,97 @@ +package co.nilin.opex.port.websocket.service + +import co.nilin.opex.port.websocket.dto.DepthResponse +import co.nilin.opex.port.websocket.dto.Interval +import co.nilin.opex.port.websocket.dto.RecentTradeResponse +import co.nilin.opex.websocket.core.inout.PriceChangeResponse +import co.nilin.opex.websocket.core.inout.PriceTickerResponse +import co.nilin.opex.websocket.core.spi.MarketQueryHandler +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import org.springframework.stereotype.Service +import java.math.BigDecimal +import java.time.ZoneId + +@Service +class MarketService(private val marketQueryHandler: MarketQueryHandler) { + + suspend fun getOrderBookDepth(symbol: String): DepthResponse { + val mappedBidOrders = ArrayList>() + val mappedAskOrders = ArrayList>() + + val bidOrders = marketQueryHandler.openBidOrders(symbol, 500) + val askOrders = marketQueryHandler.openAskOrders(symbol, 500) + + bidOrders.forEach { + val mapped = arrayListOf().apply { + add(it.price ?: BigDecimal.ZERO) + add(it.quantity ?: BigDecimal.ZERO) + } + mappedBidOrders.add(mapped) + } + + askOrders.forEach { + val mapped = arrayListOf().apply { + add(it.price ?: BigDecimal.ZERO) + add(it.quantity ?: BigDecimal.ZERO) + } + mappedAskOrders.add(mapped) + } + + val lastOrder = marketQueryHandler.lastOrder(symbol) + return DepthResponse(lastOrder?.orderId ?: -1, mappedBidOrders, mappedAskOrders) + } + + suspend fun getCandleData(symbol: String, duration: String): List> { + val i = Interval.findByLabel(duration) ?: return emptyList() + + val list = ArrayList>() + marketQueryHandler.getCandleInfo(symbol, "${i.duration} ${i.unit}", null, null, 500) + .forEach { + list.add( + arrayListOf( + it.openTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), + it.open.toString(), + it.high.toString(), + it.low.toString(), + it.close.toString(), + it.volume.toString(), + it.closeTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), + it.quoteAssetVolume.toString(), + it.trades, + it.takerBuyBaseAssetVolume.toString(), + it.takerBuyQuoteAssetVolume.toString(), + "0.0" + ) + ) + } + return list + } + + suspend fun getPriceChange(): List { + return marketQueryHandler.lastPrice(null) + } + + suspend fun getPriceOverview(symbol: String, duration: String): List { + val startDate = Interval.findByLabel(duration)?.getLocalDateTime() ?: Interval.Day.getLocalDateTime() + return listOf(marketQueryHandler.getTradeTickerDataBySymbol(symbol, startDate)) + } + + suspend fun getRecentTrades(symbol: String): List { + return marketQueryHandler.recentTrades(symbol, 500) + .map { + RecentTradeResponse( + it.symbol, + it.id, + it.price, + it.qty, + it.quoteQty, + it.time, + it.isBestMatch, + it.isMakerBuyer + ) + } + .toList() + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/MarketStreamHandler.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/MarketStreamHandler.kt new file mode 100644 index 000000000..f1ba2b546 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/MarketStreamHandler.kt @@ -0,0 +1,41 @@ +package co.nilin.opex.port.websocket.service + +import co.nilin.opex.port.websocket.dto.Interval +import co.nilin.opex.port.websocket.service.stream.IntervalStreamHandler +import co.nilin.opex.port.websocket.service.stream.StreamJob +import org.springframework.messaging.simp.SimpMessagingTemplate +import org.springframework.messaging.simp.user.SimpUserRegistry +import org.springframework.stereotype.Component +import java.util.concurrent.TimeUnit + +@Component +class MarketStreamHandler( + private val marketService: MarketService, + template: SimpMessagingTemplate, + userRegistry: SimpUserRegistry +) : IntervalStreamHandler(template, userRegistry) { + + override fun getPath(type: MarketDestinationType) = type.path + + override fun createJob(type: MarketDestinationType) = when (type) { + is MarketDestinationType.Depth -> StreamJob(2, TimeUnit.SECONDS) { + marketService.getOrderBookDepth(type.symbol) + } + is MarketDestinationType.Price -> StreamJob(2, TimeUnit.SECONDS) { + marketService.getPriceChange() + } + is MarketDestinationType.Candle -> { + val i = Interval.findByLabel(type.interval) + StreamJob(i?.duration ?: 2, i?.unit ?: TimeUnit.SECONDS) { + marketService.getCandleData(type.symbol, type.interval) + } + } + is MarketDestinationType.Overview -> StreamJob(2, TimeUnit.SECONDS) { + marketService.getPriceOverview(type.symbol, type.duration) + } + is MarketDestinationType.RecentTrades -> StreamJob(2, TimeUnit.SECONDS) { + marketService.getRecentTrades(type.symbol) + } + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/stream/IntervalStreamHandler.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/stream/IntervalStreamHandler.kt new file mode 100644 index 000000000..6bb51464c --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/stream/IntervalStreamHandler.kt @@ -0,0 +1,111 @@ +package co.nilin.opex.port.websocket.service.stream + +import co.nilin.opex.port.websocket.config.AppDispatchers +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory +import org.springframework.messaging.simp.SimpMessagingTemplate +import org.springframework.messaging.simp.user.SimpUserRegistry +import org.springframework.scheduling.annotation.Scheduled +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.ScheduledThreadPoolExecutor + +abstract class IntervalStreamHandler( + protected val template: SimpMessagingTemplate, + private val userRegistry: SimpUserRegistry +) { + + private val streamJobs = hashMapOf() + private val jobs = hashMapOf?>() + private val intervalExecutor = ScheduledThreadPoolExecutor(1) + private val governorExecutor = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + private val logger = LoggerFactory.getLogger(IntervalStreamHandler::class.java) + + init { + intervalExecutor.removeOnCancelPolicy = true + } + + fun newSubscription(type: T) { + registerJob(type) + logger.info("New subscription added for $type") + } + + private fun registerJob(type: T) { + runGovernor { + if (streamJobs[type] == null) + streamJobs[type] = createJob(type) + + val job = streamJobs[type] ?: return@runGovernor + if (jobs[type] == null || jobs[type]?.isCancelled == true) { + logger.info("job running for $type") + jobs[type] = intervalExecutor.scheduleAtFixedRate( + { job.run(type) }, + 0, + job.interval, + job.timeUnit + ) + } + } + } + + private fun StreamJob.run(type: T) { + runBlocking(AppDispatchers.websocketExecutor) { + val data = runnable() + template.convertAndSend(getPath(type), data) + } + } + + @Scheduled(fixedDelay = 60 * 1000) + private fun govern() { + runGovernor { + jobs.entries.forEach { j -> + val job = j.value + val count = userRegistry.findSubscriptions { it.destination == getPath(j.key) }.count() + if (count == 0) { + if (job?.isCancelled == false) { + logger.info("No subscriber for ${j.key}. task stopped") + job.cancel(false) + } + } else { + if (job == null || job.isCancelled) { + streamJobs[j.key]?.let { s -> + jobs[j.key] = intervalExecutor.scheduleAtFixedRate( + { s.run(j.key) }, + 0, + s.interval, + s.timeUnit + ) + } + logger.info("Starting job") + } + } + } + } + } + + private fun runGovernor(runnable: () -> Unit) { + runBlocking(governorExecutor) { runnable() } + } + + protected fun hasSubscription(): Boolean { + return userRegistry.users.isNotEmpty() + } + + protected fun hasSubscriptionFor(type: T): Boolean { + return userRegistry.findSubscriptions { it.destination == getPath(type) }.isNotEmpty() + } + + protected fun hasSubscriptionForAny(vararg paths: String): Boolean { + return userRegistry.findSubscriptions { paths.contains(it.destination) }.isNotEmpty() + } + + protected fun hasSession(): Boolean { + return userRegistry.users.map { it.sessions }.flatten().isNotEmpty() + } + + protected abstract fun createJob(type: T): StreamJob + + protected abstract fun getPath(type: T): String + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/stream/StreamJob.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/stream/StreamJob.kt new file mode 100644 index 000000000..69d544936 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/service/stream/StreamJob.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.port.websocket.service.stream + +import java.util.concurrent.TimeUnit + +data class StreamJob( + val interval: Long, + val timeUnit: TimeUnit, + val runnable: suspend () -> Any +) \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/AuthInterceptor.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/AuthInterceptor.kt new file mode 100644 index 000000000..0efb26c6c --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/AuthInterceptor.kt @@ -0,0 +1,49 @@ +package co.nilin.opex.port.websocket.socket + +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.messaging.Message +import org.springframework.messaging.MessageChannel +import org.springframework.messaging.simp.stomp.StompCommand +import org.springframework.messaging.simp.stomp.StompHeaderAccessor +import org.springframework.messaging.support.ChannelInterceptor +import org.springframework.messaging.support.MessageHeaderAccessor +import org.springframework.security.authentication.UsernamePasswordAuthenticationToken +import org.springframework.security.core.authority.SimpleGrantedAuthority +import org.springframework.security.oauth2.jwt.JwtDecoder +import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter +import org.springframework.stereotype.Component + +@Component +class AuthInterceptor : ChannelInterceptor { + + private val logger = LoggerFactory.getLogger(ChannelInterceptor::class.java) + + @Autowired + private lateinit var jwtDecoder: JwtDecoder + private val converter = JwtAuthenticationConverter() + + override fun preSend(message: Message<*>, channel: MessageChannel): Message<*> { + val accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor::class.java) + if (accessor?.command == StompCommand.CONNECT) { + val authorization = accessor.getNativeHeader("Authorization") + logger.debug("Authorization: $authorization") + + if (authorization.isNullOrEmpty()) { + accessor.user = UsernamePasswordAuthenticationToken( + "anonymous", + "N/A", + arrayListOf(SimpleGrantedAuthority("anonymous")) + ) + return message + } + + val token = authorization[0] + val jwt = jwtDecoder.decode(token) + val auth = converter.convert(jwt) + accessor.user = auth + } + return message + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/StompEventsConfig.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/StompEventsConfig.kt new file mode 100644 index 000000000..8e6e772a7 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/StompEventsConfig.kt @@ -0,0 +1,42 @@ +package co.nilin.opex.port.websocket.socket + +import org.springframework.context.ApplicationListener +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent +import org.springframework.web.socket.messaging.* + +@Configuration +class StompEventsConfig { + + @Bean + fun brokerAvailabilityListener() = ApplicationListener { event -> + println("Is broker available: ${event.isBrokerAvailable}") + } + + @Bean + fun sessionConnectListener() = ApplicationListener { event -> + println("* session connect received: ${event.message}") + } + + @Bean + fun sessionConnectedListener() = ApplicationListener { event -> + println("* connected: ${event.message}") + } + + @Bean + fun sessionDisconnectedListener() = ApplicationListener { event -> + println("* disconnected: ${event.message}") + } + + @Bean + fun sessionSubscribeListener() = ApplicationListener { event -> + println("* subscribed: ${event.message}") + } + + @Bean + fun sessionUnsubscribeEventListener() = ApplicationListener { event -> + println("- unsubscribed: ${event.message}") + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/WebSocketAuthenticationConfig.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/WebSocketAuthenticationConfig.kt new file mode 100644 index 000000000..c73916e40 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/WebSocketAuthenticationConfig.kt @@ -0,0 +1,21 @@ +package co.nilin.opex.port.websocket.socket + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Configuration +import org.springframework.core.Ordered +import org.springframework.core.annotation.Order +import org.springframework.messaging.simp.config.ChannelRegistration +import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer + +@Configuration +@Order(Ordered.HIGHEST_PRECEDENCE + 99) +class WebSocketAuthenticationConfig : WebSocketMessageBrokerConfigurer { + + @Autowired + private lateinit var authInterceptor: AuthInterceptor + + override fun configureClientInboundChannel(registration: ChannelRegistration) { + registration.interceptors(authInterceptor) + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/WebSocketAuthorizationConfig.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/WebSocketAuthorizationConfig.kt new file mode 100644 index 000000000..304a2b58f --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/WebSocketAuthorizationConfig.kt @@ -0,0 +1,21 @@ +package co.nilin.opex.port.websocket.socket + +import org.springframework.context.annotation.Configuration +import org.springframework.security.config.annotation.web.messaging.MessageSecurityMetadataSourceRegistry +import org.springframework.security.config.annotation.web.socket.AbstractSecurityWebSocketMessageBrokerConfigurer + +@Configuration +class WebSocketAuthorizationConfig : AbstractSecurityWebSocketMessageBrokerConfigurer() { + + override fun configureInbound(messages: MessageSecurityMetadataSourceRegistry) { + with(messages) { + simpDestMatchers("/secured/**").hasAuthority("SCOPE_trust") + anyMessage().permitAll() + } + } + + override fun sameOriginDisabled(): Boolean { + return true + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/WebSocketConfig.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/WebSocketConfig.kt new file mode 100644 index 000000000..db18d95a2 --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/socket/WebSocketConfig.kt @@ -0,0 +1,27 @@ +package co.nilin.opex.port.websocket.socket + +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.simp.config.MessageBrokerRegistry +import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker +import org.springframework.web.socket.config.annotation.StompEndpointRegistry +import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer + +@Configuration +@EnableWebSocketMessageBroker +class WebSocketConfig : WebSocketMessageBrokerConfigurer { + + override fun registerStompEndpoints(registry: StompEndpointRegistry) { + registry.addEndpoint("/ws") + .setAllowedOriginPatterns("*") + .withSockJS() + } + + override fun configureMessageBroker(registry: MessageBrokerRegistry) { + with(registry) { + enableSimpleBroker("/topic", "/secured/queue") + setApplicationDestinationPrefixes("/app", "/topic") + //setUserDestinationPrefix("/secured/user") + } + } + +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/utils/EnumExtensions.kt b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/utils/EnumExtensions.kt new file mode 100644 index 000000000..959e714cb --- /dev/null +++ b/Websocket/websocket-app/src/main/kotlin/co/nilin/opex/port/websocket/utils/EnumExtensions.kt @@ -0,0 +1,45 @@ +package co.nilin.opex.port.websocket.utils + +import co.nilin.opex.websocket.core.inout.OrderSide +import co.nilin.opex.websocket.core.inout.OrderStatus +import co.nilin.opex.websocket.core.inout.TimeInForce +import co.nilin.opex.matching.core.model.MatchConstraint +import co.nilin.opex.matching.core.model.OrderDirection +import co.nilin.opex.matching.core.model.OrderType + +fun MatchConstraint.toTimeInForce(): TimeInForce { + if (this == MatchConstraint.FOK_BUDGET) + return TimeInForce.FOK + if (this == MatchConstraint.IOC_BUDGET) + return TimeInForce.IOC + return TimeInForce.valueOf(this.name) +} + + +fun TimeInForce.toMatchConstraint(): MatchConstraint { + return MatchConstraint.valueOf(this.name) +} + +fun OrderType.toWebsocketOrderType(): co.nilin.opex.websocket.core.inout.OrderType { + if (this == OrderType.LIMIT_ORDER) + return co.nilin.opex.websocket.core.inout.OrderType.LIMIT + if (this == OrderType.MARKET_ORDER) + return co.nilin.opex.websocket.core.inout.OrderType.MARKET + throw IllegalArgumentException("OrderType $this is not supported!") +} + +fun OrderDirection.toOrderSide(): OrderSide { + if (this == OrderDirection.BID) + return OrderSide.BUY + return OrderSide.SELL +} + +fun OrderStatus.isWorking(): Boolean { + return listOf(OrderStatus.NEW, OrderStatus.PARTIALLY_FILLED).contains(this) +} + +fun Int.toOrderStatus(): OrderStatus { + val status = co.nilin.opex.accountant.core.inout.OrderStatus.values() + .find { s -> s.code == this } + return OrderStatus.valueOf(status!!.name) +} \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/resources/application-docker.yml b/Websocket/websocket-app/src/main/resources/application-docker.yml new file mode 100644 index 000000000..81262acea --- /dev/null +++ b/Websocket/websocket-app/src/main/resources/application-docker.yml @@ -0,0 +1,18 @@ +spring: + kafka: + bootstrap-servers: ${KAFKA_IP_PORT} + redis: + host: ${REDIS_HOST} + r2dbc: + url: r2dbc:postgresql://${DB_IP_PORT}/opex_api + username: opex + password: hiopex + cloud: + consul: + host: ${CONSUL_HOST} + port: 8500 + main: + allow-bean-definition-overriding: true +app: + auth: + cert-url: http://auth:8083/auth/realms/opex/protocol/openid-connect/certs \ No newline at end of file diff --git a/Websocket/websocket-app/src/main/resources/application.yml b/Websocket/websocket-app/src/main/resources/application.yml new file mode 100644 index 000000000..e7df088d6 --- /dev/null +++ b/Websocket/websocket-app/src/main/resources/application.yml @@ -0,0 +1,28 @@ +server: + port: 8097 +spring: + application: + name: opex-websocket + main: + allow-bean-definition-overriding: false + kafka: + bootstrap-servers: 192.168.178.29:9092 + consumer: + group-id: websocket + r2dbc: + url: r2dbc:postgresql://localhost/opex_api + username: opex + password: hiopex + initialization-mode: always + cloud: + bootstrap: + enabled: true + consul: + port: 8500 + discovery: + instance-id: ${spring.application.name}:${server.port} + healthCheckInterval: 20s + prefer-ip-address: true +app: + auth: + cert-url: http://localhost:8083/auth/realms/opex/protocol/openid-connect/certs \ No newline at end of file diff --git a/Websocket/websocket-core/.gitignore b/Websocket/websocket-core/.gitignore new file mode 100644 index 000000000..f3a8317d6 --- /dev/null +++ b/Websocket/websocket-core/.gitignore @@ -0,0 +1,4 @@ +*.iml +target/ +.mvn/ +.idea/ \ No newline at end of file diff --git a/Websocket/websocket-core/pom.xml b/Websocket/websocket-core/pom.xml new file mode 100644 index 000000000..fe5f9c8ee --- /dev/null +++ b/Websocket/websocket-core/pom.xml @@ -0,0 +1,104 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.4.4 + + + co.nilin.opex + websocket-core + 1.0-SNAPSHOT + websocket-core + + + 1.8 + 1.4.31 + ${version} + ${version} + + + + + org.springframework.boot + spring-boot-starter + + + org.jetbrains.kotlin + kotlin-reflect + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + + + io.projectreactor.kotlin + reactor-kotlin-extensions + + + org.jetbrains.kotlinx + kotlinx-coroutines-reactor + + + org.jetbrains.kotlinx + kotlinx-coroutines-core + + + co.nilin.opex + matching-core + ${matching.version} + + + co.nilin.opex + accountant-core + ${accountant.version} + provided + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + org.springframework + spring-tx + provided + + + + + ${project.basedir}/src/main/kotlin + ${project.basedir}/src/test/kotlin + + + org.jetbrains.kotlin + kotlin-maven-plugin + + + -Xjsr305=strict + + + spring + + + + + org.jetbrains.kotlin + kotlin-maven-allopen + ${kotlin.version} + + + + + + + \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/AggregatedOrderPriceModel.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/AggregatedOrderPriceModel.kt new file mode 100644 index 000000000..adb2ec145 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/AggregatedOrderPriceModel.kt @@ -0,0 +1,6 @@ +package co.nilin.opex.websocket.core.inout + +data class AggregatedOrderPriceModel( + val price: Double?, + val quantity: Double? +) \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/AllOrderRequest.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/AllOrderRequest.kt new file mode 100644 index 000000000..94087e701 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/AllOrderRequest.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.websocket.core.inout + +import java.util.* + +class AllOrderRequest( + val symbol: String?, + val startTime: Date?, + val endTime: Date?, + val limit: Int? = 500, //Default 500; max 1000. +) \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/CandleData.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/CandleData.kt new file mode 100644 index 000000000..a1dbf8ee6 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/CandleData.kt @@ -0,0 +1,17 @@ +package co.nilin.opex.websocket.core.inout + +import java.time.LocalDateTime + +data class CandleData( + val openTime: LocalDateTime, + val closeTime: LocalDateTime, + val open: Double, + val close: Double, + val high: Double, + val low: Double, + val volume: Double, + val quoteAssetVolume: Double, + val trades: Int, + val takerBuyBaseAssetVolume: Double, + val takerBuyQuoteAssetVolume: Double, +) diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/MarketTradeResponse.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/MarketTradeResponse.kt new file mode 100644 index 000000000..e02c464f3 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/MarketTradeResponse.kt @@ -0,0 +1,15 @@ +package co.nilin.opex.websocket.core.inout + +import java.math.BigDecimal +import java.util.* + +data class MarketTradeResponse( + val symbol: String, + val id: Long, + val price: BigDecimal, + val qty: BigDecimal, + val quoteQty: BigDecimal, + val time: Date, + val isBestMatch: Boolean, + val isMakerBuyer: Boolean +) \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/OrderBookResponse.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/OrderBookResponse.kt new file mode 100644 index 000000000..f3162f29a --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/OrderBookResponse.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.websocket.core.inout + +import java.math.BigDecimal + +data class OrderBookResponse( + val price: BigDecimal?, + val quantity: BigDecimal? +) \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/OrderEnums.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/OrderEnums.kt new file mode 100644 index 000000000..9e986f885 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/OrderEnums.kt @@ -0,0 +1,41 @@ +package co.nilin.opex.websocket.core.inout + +enum class TimeInForce { + GTC, //Good Til Canceled, An order will be on the book unless the order is canceled. + IOC, //Immediate Or Cancel, An order will try to fill the order as much as it can before the order expires. + FOK, //Fill or Kill, An order will expire if the full order cannot be filled upon execution. +} + +enum class OrderStatus(val code: Int) { + + NEW(1), //The order has been accepted by the engine. + PARTIALLY_FILLED(4), //A part of the order has been filled. + FILLED(5), //The order has been completed. + CANCELED(2), //The order has been canceled by the user. + PENDING_CANCEL(7), //Currently unused + REJECTED(3), //The order was not accepted by the engine and not processed. + EXPIRED(6) //The order was canceled according to the order type's rules (e.g. LIMIT FOK orders with no fill, LIMIT IOC or MARKET orders that partially fill) or by the exchange, (e.g. orders canceled during liquidation, orders canceled during maintenance) +} + +enum class OrderType { + LIMIT, // timeInForce, quantity, price + MARKET, // quantity or quoteOrderQty + STOP_LOSS, // quantity, stopPrice + STOP_LOSS_LIMIT, // timeInForce, quantity, price, stopPrice + TAKE_PROFIT, // quantity, stopPrice + TAKE_PROFIT_LIMIT, // timeInForce, quantity, price, stopPrice + LIMIT_MAKER; // quantity, price + + companion object { + fun activeTypes() = listOf(LIMIT, MARKET) + } +} + +enum class OrderSide { + BUY, + SELL +} + +enum class OrderResponseType { + ACK, RESULT, FULL +} \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/PriceChangeResponse.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/PriceChangeResponse.kt new file mode 100644 index 000000000..ca7592895 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/PriceChangeResponse.kt @@ -0,0 +1,21 @@ +package co.nilin.opex.websocket.core.inout + +data class PriceChangeResponse( + val symbol: String, + val priceChange: Double = 0.0, + val priceChangePercent: Double = 0.0, + val weightedAvgPrice: Double = 0.0, + val lastPrice: Double = 0.0, + val lastQty: Double = 0.0, + val bidPrice: Double = 0.0, + val askPrice: Double = 0.0, + val openPrice: Double = 0.0, + val highPrice: Double = 0.0, + val lowPrice: Double = 0.0, + val volume: Double = 0.0, + val openTime: Long, + val closeTime: Long, + val firstId: Long = 0, + val lastId: Long = 0, + val count: Long = 0, +) diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/PriceTickerResponse.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/PriceTickerResponse.kt new file mode 100644 index 000000000..1cb639b76 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/PriceTickerResponse.kt @@ -0,0 +1,6 @@ +package co.nilin.opex.websocket.core.inout + +data class PriceTickerResponse( + val symbol: String?, + val price: String? +) \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/QueryOrderRequest.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/QueryOrderRequest.kt new file mode 100644 index 000000000..434056408 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/QueryOrderRequest.kt @@ -0,0 +1,7 @@ +package co.nilin.opex.websocket.core.inout + +data class QueryOrderRequest( + val symbol: String, + val orderId: Long?, + val origClientOrderId: String? +) \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/QueryOrderResponse.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/QueryOrderResponse.kt new file mode 100644 index 000000000..d8c7b8d3b --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/QueryOrderResponse.kt @@ -0,0 +1,26 @@ +package co.nilin.opex.websocket.core.inout + +import java.math.BigDecimal +import java.util.* + +data class QueryOrderResponse( + val symbol: String, + val ouid: String, + val orderId: Long, + val orderListId: Long, //Unless part of an OCO, the value will always be -1. + val clientOrderId: String, + val price: BigDecimal, + val origQty: BigDecimal, + val executedQty: BigDecimal, + val cummulativeQuoteQty: BigDecimal, + val status: OrderStatus, + val timeInForce: TimeInForce, + val type: OrderType, + val side: OrderSide, + val stopPrice: BigDecimal?, + val icebergQty: BigDecimal?, + val time: Date, + val updateTime: Date, + val isWorking: Boolean, + val origQuoteOrderQty: BigDecimal +) \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/TradeRequest.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/TradeRequest.kt new file mode 100644 index 000000000..27957150c --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/TradeRequest.kt @@ -0,0 +1,11 @@ +package co.nilin.opex.websocket.core.inout + +import java.util.* + +class TradeRequest( + val symbol: String?, + val fromTrade: Long?, + val startTime: Date?, + val endTime: Date?, + val limit: Int? = 500 //Default 500; max 1000. +) \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/TradeResponse.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/TradeResponse.kt new file mode 100644 index 000000000..935416938 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/inout/TradeResponse.kt @@ -0,0 +1,21 @@ +package co.nilin.opex.websocket.core.inout + +import java.math.BigDecimal +import java.util.* + +data class TradeResponse( + val symbol: String, + val id: Long, + val orderId: Long, + val orderListId: Long = -1, + val price: BigDecimal, + val qty: BigDecimal, + val quoteQty: BigDecimal, + val commission: BigDecimal, + val commissionAsset: String, + val time: Date, + val isBuyer: Boolean, + val isMaker: Boolean, + val isBestMatch: Boolean, + val isMakerBuyer: Boolean +) \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/EventStreamHandler.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/EventStreamHandler.kt new file mode 100644 index 000000000..d4002ba70 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/EventStreamHandler.kt @@ -0,0 +1,12 @@ +package co.nilin.opex.websocket.core.spi + +import co.nilin.opex.accountant.core.inout.RichOrder +import co.nilin.opex.accountant.core.inout.RichTrade + +interface EventStreamHandler { + + fun handleOrder(order: RichOrder) + + fun handleTrade(trade: RichTrade) + +} \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/MarketQueryHandler.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/MarketQueryHandler.kt new file mode 100644 index 000000000..4676a4379 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/MarketQueryHandler.kt @@ -0,0 +1,29 @@ +package co.nilin.opex.websocket.core.spi + +import co.nilin.opex.websocket.core.inout.* +import kotlinx.coroutines.flow.Flow +import java.time.LocalDateTime + +interface MarketQueryHandler { + + suspend fun getTradeTickerDataBySymbol(symbol: String, startFrom: LocalDateTime): PriceChangeResponse + + suspend fun openBidOrders(symbol: String, limit: Int): List + + suspend fun openAskOrders(symbol: String, limit: Int): List + + suspend fun lastOrder(symbol: String): QueryOrderResponse? + + suspend fun recentTrades(symbol: String, limit: Int): Flow + + suspend fun lastPrice(symbol: String?): List + + suspend fun getCandleInfo( + symbol: String, + interval: String, + startTime: Long?, + endTime: Long?, + limit: Int + ): List + +} \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/SymbolMapper.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/SymbolMapper.kt new file mode 100644 index 000000000..76913d2e0 --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/SymbolMapper.kt @@ -0,0 +1,10 @@ +package co.nilin.opex.websocket.core.spi + +interface SymbolMapper { + + suspend fun map(symbol: String?): String? + + suspend fun unmap(value: String?): String? + + suspend fun getKeyValues(): Map +} \ No newline at end of file diff --git a/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/UserQueryHandler.kt b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/UserQueryHandler.kt new file mode 100644 index 000000000..45263588a --- /dev/null +++ b/Websocket/websocket-core/src/main/kotlin/co/nilin/opex/websocket/core/spi/UserQueryHandler.kt @@ -0,0 +1,16 @@ +package co.nilin.opex.websocket.core.spi + +import co.nilin.opex.websocket.core.inout.* +import kotlinx.coroutines.flow.Flow +import java.security.Principal + +interface UserQueryHandler { + + suspend fun queryOrder(principal: Principal, request: QueryOrderRequest): QueryOrderResponse? + + suspend fun openOrders(principal: Principal, symbol: String?): Flow + + suspend fun allOrders(principal: Principal, allOrderRequest: AllOrderRequest): Flow + + suspend fun allTrades(principal: Principal, request: TradeRequest): Flow +} \ No newline at end of file diff --git a/Websocket/websocket-ports/.gitignore b/Websocket/websocket-ports/.gitignore new file mode 100644 index 000000000..549e00a2a --- /dev/null +++ b/Websocket/websocket-ports/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/Websocket/websocket-ports/websocket-eventlistener-kafka/.gitignore b/Websocket/websocket-ports/websocket-eventlistener-kafka/.gitignore new file mode 100644 index 000000000..bb9840a17 --- /dev/null +++ b/Websocket/websocket-ports/websocket-eventlistener-kafka/.gitignore @@ -0,0 +1,34 @@ +HELP.md +target/ +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr +.mvn/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ + + +### VS Code ### +.vscode/ + +.DS_Store diff --git a/Websocket/websocket-ports/websocket-eventlistener-kafka/pom.xml b/Websocket/websocket-ports/websocket-eventlistener-kafka/pom.xml new file mode 100644 index 000000000..2a2d9f3c4 --- /dev/null +++ b/Websocket/websocket-ports/websocket-eventlistener-kafka/pom.xml @@ -0,0 +1,119 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.4.4 + + + co.nilin.opex + websocket-eventlistener-kafka + 1.0-SNAPSHOT + websocket-eventlistener-kafka + + + 1.8 + 1.4.31 + ${version} + ${version} + ${version} + + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-webflux + + + co.nilin.opex + matching-core + ${matching.version} + provided + + + co.nilin.opex + accountant-core + ${accountant.version} + provided + + + co.nilin.opex + websocket-core + ${websocket.version} + provided + + + org.springframework.kafka + spring-kafka + + + io.projectreactor.kotlin + reactor-kotlin-extensions + + + org.jetbrains.kotlin + kotlin-reflect + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + + + org.jetbrains.kotlinx + kotlinx-coroutines-reactor + + + org.jetbrains.kotlinx + kotlinx-coroutines-core + + + org.springframework.kafka + spring-kafka-test + test + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + + ${project.basedir}/src/main/kotlin + ${project.basedir}/src/test/kotlin + + + org.jetbrains.kotlin + kotlin-maven-plugin + + + -Xjsr305=strict + + + spring + + + + + org.jetbrains.kotlin + kotlin-maven-allopen + ${kotlin.version} + + + + + + + diff --git a/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/config/WebSocketKafkaConfig.kt b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/config/WebSocketKafkaConfig.kt new file mode 100644 index 000000000..905b33c00 --- /dev/null +++ b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/config/WebSocketKafkaConfig.kt @@ -0,0 +1,96 @@ +package co.nilin.opex.port.websocket.kafka.config + +import co.nilin.opex.matching.core.eventh.events.CoreEvent +import co.nilin.opex.port.websocket.kafka.consumer.OrderKafkaListener +import co.nilin.opex.port.websocket.kafka.consumer.TradeKafkaListener +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.support.GenericApplicationContext +import org.springframework.kafka.core.* +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.support.serializer.JsonDeserializer +import org.springframework.kafka.support.serializer.JsonSerializer +import java.util.regex.Pattern + +@Configuration +class WebSocketKafkaConfig { + + @Value("\${spring.kafka.bootstrap-servers}") + private val bootstrapServers: String? = null + + @Value("\${spring.kafka.consumer.group-id}") + private val groupId: String? = null + + @Bean("websocketConsumerConfig") + fun consumerConfigs(): Map? { + val props: MutableMap = HashMap() + props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + props[ConsumerConfig.GROUP_ID_CONFIG] = groupId + props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java + props[JsonDeserializer.TRUSTED_PACKAGES] = "co.nilin.opex.*" + return props + } + + @Bean("websocketConsumerFactory") + fun consumerFactory(@Qualifier("websocketConsumerConfig") consumerConfigs: Map): ConsumerFactory { + return DefaultKafkaConsumerFactory(consumerConfigs) + } + + @Bean("websocketProducerConfig") + fun producerConfigs(): Map { + val props: MutableMap = HashMap() + props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java + props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = JsonSerializer::class.java + return props + } + + @Bean("websocketProducerFactory") + fun producerFactory(@Qualifier("websocketProducerConfig") producerConfigs: Map): ProducerFactory { + return DefaultKafkaProducerFactory(producerConfigs) + } + + @Bean("websocketKafkaTemplate") + fun kafkaTemplate(@Qualifier("websocketProducerFactory") producerFactory: ProducerFactory): KafkaTemplate { + return KafkaTemplate(producerFactory) + } + + @Autowired + @ConditionalOnBean(TradeKafkaListener::class) + fun configureTradeListener(tradeListener: TradeKafkaListener, @Qualifier("websocketConsumerFactory") consumerFactory: ConsumerFactory) { + val containerProps = ContainerProperties(Pattern.compile("richTrade")) + containerProps.messageListener = tradeListener + val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) + container.beanName = "WebsocketTradeKafkaListenerContainer" + container.start() + } + + @Autowired + @ConditionalOnBean(OrderKafkaListener::class) + fun configureOrderListener(orderListener: OrderKafkaListener, @Qualifier("websocketConsumerFactory") consumerFactory: ConsumerFactory) { + val containerProps = ContainerProperties(Pattern.compile("richOrder")) + containerProps.messageListener = orderListener + val container = ConcurrentMessageListenerContainer(consumerFactory, containerProps) + container.beanName = "WebsocketOrderKafkaListenerContainer" + container.start() + } + + @Autowired + fun createTopics(applicationContext: GenericApplicationContext) { + applicationContext.registerBean("topic_richOrder", NewTopic::class.java, "richOrder", 10, 1) + applicationContext.registerBean("topic_richTrade", NewTopic::class.java, "richTrade", 10, 1) + } + + +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/consumer/OrderKafkaListener.kt b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/consumer/OrderKafkaListener.kt new file mode 100644 index 000000000..e9879e2ff --- /dev/null +++ b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/consumer/OrderKafkaListener.kt @@ -0,0 +1,29 @@ +package co.nilin.opex.port.websocket.kafka.consumer + +import co.nilin.opex.accountant.core.inout.RichOrder +import co.nilin.opex.port.websocket.kafka.spi.RichOrderListener +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.listener.MessageListener +import org.springframework.stereotype.Component + +@Component +class OrderKafkaListener : MessageListener { + + val orderListeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { + orderListeners.forEach { tl -> + tl.onOrder(data.value(), data.partition(), data.offset(), data.timestamp()) + } + } + + fun addOrderListener(tl: RichOrderListener) { + orderListeners.add(tl) + } + + fun removeOrderListener(tl: RichOrderListener) { + orderListeners.removeIf { item -> + item.id() == tl.id() + } + } +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/consumer/TradeKafkaListener.kt b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/consumer/TradeKafkaListener.kt new file mode 100644 index 000000000..7072951cd --- /dev/null +++ b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/consumer/TradeKafkaListener.kt @@ -0,0 +1,29 @@ +package co.nilin.opex.port.websocket.kafka.consumer + +import co.nilin.opex.accountant.core.inout.RichTrade +import co.nilin.opex.port.websocket.kafka.spi.RichTradeListener +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.kafka.listener.MessageListener +import org.springframework.stereotype.Component + +@Component +class TradeKafkaListener : MessageListener { + + val tradeListeners = arrayListOf() + + override fun onMessage(data: ConsumerRecord) { + tradeListeners.forEach { tl -> + tl.onTrade(data.value(), data.partition(), data.offset(), data.timestamp()) + } + } + + fun addTradeListener(tl: RichTradeListener) { + tradeListeners.add(tl) + } + + fun removeTradeListener(tl: RichTradeListener) { + tradeListeners.removeIf { item -> + item.id() == tl.id() + } + } +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/spi/EventListener.kt b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/spi/EventListener.kt new file mode 100644 index 000000000..b8e111a40 --- /dev/null +++ b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/spi/EventListener.kt @@ -0,0 +1,9 @@ +package co.nilin.opex.port.websocket.kafka.spi + +import co.nilin.opex.matching.core.eventh.events.CoreEvent + + +interface EventListener { + fun id(): String + fun onEvent(coreEvent: CoreEvent, partition: Int, offset: Long, timestamp: Long) +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/spi/RichOrderListener.kt b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/spi/RichOrderListener.kt new file mode 100644 index 000000000..326ef97a6 --- /dev/null +++ b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/spi/RichOrderListener.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.port.websocket.kafka.spi + +import co.nilin.opex.accountant.core.inout.RichOrder + +interface RichOrderListener { + fun id(): String + fun onOrder(order: RichOrder, partition: Int, offset: Long, timestamp: Long) +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/spi/RichTradeListener.kt b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/spi/RichTradeListener.kt new file mode 100644 index 000000000..b6c596c4f --- /dev/null +++ b/Websocket/websocket-ports/websocket-eventlistener-kafka/src/main/kotlin/co/nilin/opex/port/websocket/kafka/spi/RichTradeListener.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.port.websocket.kafka.spi + +import co.nilin.opex.accountant.core.inout.RichTrade + +interface RichTradeListener { + fun id(): String + fun onTrade(trade: RichTrade, partition: Int, offset: Long, timestamp: Long) +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/.gitignore b/Websocket/websocket-ports/websocket-persister-postgres/.gitignore new file mode 100644 index 000000000..de5a9214d --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/.gitignore @@ -0,0 +1,34 @@ +HELP.md +target/ +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr +.mvn/ +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +.DS_Store diff --git a/Websocket/websocket-ports/websocket-persister-postgres/pom.xml b/Websocket/websocket-ports/websocket-persister-postgres/pom.xml new file mode 100644 index 000000000..1d1091700 --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/pom.xml @@ -0,0 +1,130 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.4.4 + + + co.nilin.opex + websocket-persister-postgres + 1.0-SNAPSHOT + websocket-persister-postgres + + + 1.8 + 1.4.31 + ${version} + ${version} + ${version} + ${version} + + + + + co.nilin.opex + matching-core + ${matching.version} + provided + + + co.nilin.opex + websocket-core + ${websocket.version} + provided + + + co.nilin.opex + accountant-core + ${accountant.version} + provided + + + co.nilin.opex + error-handler + ${utility.version} + provided + + + org.springframework.boot + spring-boot-starter-data-r2dbc + + + io.r2dbc + r2dbc-postgresql + runtime + + + org.postgresql + postgresql + runtime + + + + io.projectreactor.kotlin + reactor-kotlin-extensions + + + org.jetbrains.kotlin + kotlin-reflect + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + + + org.jetbrains.kotlinx + kotlinx-coroutines-reactor + + + + org.jetbrains.kotlinx + kotlinx-coroutines-core + + + com.google.code.gson + gson + + + io.projectreactor + reactor-test + test + + + + + ${project.basedir}/src/main/kotlin + ${project.basedir}/src/test/kotlin + + + org.jetbrains.kotlin + kotlin-maven-plugin + + + -Xjsr305=strict + + + spring + + + + + org.jetbrains.kotlin + kotlin-maven-allopen + ${kotlin.version} + + + + + + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + + diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/config/PostgresConfig.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/config/PostgresConfig.kt new file mode 100644 index 000000000..0f54814cc --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/config/PostgresConfig.kt @@ -0,0 +1,8 @@ +package co.nilin.opex.port.websocket.postgres.config + +import org.springframework.context.annotation.Configuration +import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories + +@Configuration +@EnableR2dbcRepositories(basePackages = ["co.nilin.opex"]) +class PostgresConfig diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/dao/OrderRepository.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/dao/OrderRepository.kt new file mode 100644 index 000000000..a3338e733 --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/dao/OrderRepository.kt @@ -0,0 +1,106 @@ +package co.nilin.opex.port.websocket.postgres.dao + +import co.nilin.opex.matching.core.model.OrderDirection +import co.nilin.opex.websocket.core.inout.AggregatedOrderPriceModel +import co.nilin.opex.port.websocket.postgres.model.OrderModel +import kotlinx.coroutines.flow.Flow +import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.repository.query.Param +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.util.* + +@Repository +interface OrderRepository : ReactiveCrudRepository { + + @Query("select * from orders where ouid = :ouid") + fun findByOuid(@Param("ouid") ouid: String): Mono + + @Query("select * from orders where symbol = :symbol and order_id = :orderId") + fun findBySymbolAndOrderId( + @Param("symbol") + symbol: String, + @Param("orderId") + orderId: Long + ): Mono + + @Query("select * from orders where symbol = :symbol and client_order_id = :origClientOrderId") + fun findBySymbolAndClientOrderId( + @Param("symbol") + symbol: String, + @Param("origClientOrderId") + origClientOrderId: String + ): Mono + + @Query("select * from orders where uuid = :uuid and (:symbol is null or symbol = :symbol) and status in (:statuses)") + fun findByUuidAndSymbolAndStatus( + @Param("uuid") + uuid: String, + @Param("symbol") + symbol: String?, + @Param("statuses") + status: Collection + ): Flow + + @Query( + "select * from orders where uuid = :uuid " + + "and (:symbol is null or symbol = :symbol) " + + "and (:startTime is null or update_date >= :startTime)" + + "and (:endTime is null or update_date < :endTime)" + ) + fun findByUuidAndSymbolAndTimeBetween( + @Param("uuid") + uuid: String, + @Param("symbol") + symbol: String?, + @Param("startTime") + startTime: Date?, + @Param("endTime") + endTime: Date? + ): Flow + + @Query( + """ + select price, (sum(quantity) - sum(executed_qty)) as quantity from orders + where symbol = :symbol and side = :direction and status in (:statuses) + group by price + order by price asc + limit :limit + """ + ) + fun findBySymbolAndDirectionAndStatusSortAscendingByPrice( + @Param("symbol") + symbol: String, + @Param("direction") + direction: OrderDirection, + @Param("limit") + limit: Int, + @Param("statuses") + status: Collection + ): Flux + + @Query( + """ + select price, (sum(quantity) - sum(executed_qty)) as quantity from orders + where symbol = :symbol and side = :direction and status in (:statuses) + group by price + order by price desc + limit :limit + """ + ) + fun findBySymbolAndDirectionAndStatusSortDescendingByPrice( + @Param("symbol") + symbol: String, + @Param("direction") + direction: OrderDirection, + @Param("limit") + limit: Int, + @Param("statuses") + status: Collection + ): Flux + + @Query("select * from orders where symbol = :symbol order by create_date desc limit 1") + fun findLastOrderBySymbol(@Param("symbol") symbol: String): Mono +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/dao/SymbolMapRepository.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/dao/SymbolMapRepository.kt new file mode 100644 index 000000000..5d266accc --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/dao/SymbolMapRepository.kt @@ -0,0 +1,18 @@ +package co.nilin.opex.port.websocket.postgres.dao + +import co.nilin.opex.port.websocket.postgres.model.SymbolMapModel +import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.repository.query.Param +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono + +@Repository +interface SymbolMapRepository : ReactiveCrudRepository { + + @Query("select * from symbol_maps where symbol = :symbol") + fun findBySymbol(@Param("symbol") symbol: String): Mono + + @Query("select * from symbol_maps where value = :value") + fun findByValue(@Param("value") value: String): Mono +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/dao/TradeRepository.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/dao/TradeRepository.kt new file mode 100644 index 000000000..0dec743b2 --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/dao/TradeRepository.kt @@ -0,0 +1,150 @@ +package co.nilin.opex.port.websocket.postgres.dao + +import co.nilin.opex.port.websocket.postgres.model.CandleInfoData +import co.nilin.opex.port.websocket.postgres.model.TradeModel +import co.nilin.opex.port.websocket.postgres.model.TradeTickerData +import kotlinx.coroutines.flow.Flow +import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.repository.query.Param +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.time.LocalDateTime +import java.util.* + +@Repository +interface TradeRepository : ReactiveCrudRepository { + + @Query("select * from trades where :ouid in (taker_ouid, maker_ouid) ") + fun findByOuid(@Param("ouid") ouid: String): Flow + + @Query( + """ + select * from trades where :uuid in (taker_uuid, maker_uuid) + and (:fromTrade is null or id > :fromTrade) + and (:symbol is null or symbol = :symbol) + and (:startTime is null or trade_date >= :startTime) + and (:endTime is null or trade_date < :endTime) + """ + ) + fun findByUuidAndSymbolAndTimeBetweenAndTradeIdGreaterThan( + @Param("uuid") + uuid: String, + @Param("symbol") + symbol: String?, + @Param("fromTrade") + fromTrade: Long?, + @Param("startTime") + startTime: Date?, + @Param("endTime") + endTime: Date? + ): Flow + + @Query("select * from trades where symbol = :symbol order by create_date desc limit :limit") + fun findBySymbolSortDescendingByCreateDate( + @Param("symbol") + symbol: String, + @Param("limit") + limit: Int + ): Flow + + @Query( + """ + select symbol, + (select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date desc limit 1) - (select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date asc limit 1) as price_change, + ((((select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date desc limit 1) - (select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date asc limit 1))/(select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date asc limit 1))*100) as price_change_percent, + (sum(matched_quantity)/sum(taker_price)) as weighted_avg_price, + (select taker_price from trades where create_date > :date and symbol=t.symbol order by create_date asc limit 1) as last_price, + (select matched_quantity from trades where create_date > :date and symbol=t.symbol order by create_date asc limit 1) as last_qty, + (select price from orders where create_date > :date and symbol=t.symbol and (status=1 or status=4) and side='BID' order by create_date desc limit 1) as bid_price, + (select price from orders where create_date > :date and symbol=t.symbol and (status=1 or status=4) and side='ASK' order by create_date asc limit 1) as ask_price, + (select price from orders where create_date > :date and symbol=t.symbol and (status=1 or status=4) order by create_date desc limit 1) as open_price, + max(taker_price) as high_price, + min(taker_price) as low_price, + sum(matched_quantity) as volume, + (select id from trades where create_date > :date and symbol=t.symbol order by create_date asc limit 1) as first_id, + (select id from trades where create_date > :date and symbol=t.symbol order by create_date desc limit 1) as last_id, + count(id) as count + from trades as t + where create_date > :date + group by symbol + """ + ) + fun tradeTicker(@Param("date") createDate: LocalDateTime): Flux + + @Query( + """ + select symbol, + (select taker_price from trades where create_date > :date and symbol=:symbol order by create_date desc limit 1) - (select taker_price from trades where create_date > :date and symbol=:symbol order by create_date asc limit 1) as price_change, + ((((select taker_price from trades where create_date > :date and symbol=:symbol order by create_date desc limit 1) - (select taker_price from trades where create_date > :date and symbol=:symbol order by create_date asc limit 1))/(select taker_price from trades where create_date > :date and symbol=:symbol order by create_date asc limit 1))*100) as price_change_percent, + (sum(matched_quantity)/sum(taker_price)) as weighted_avg_price, + (select taker_price from trades where create_date > :date and symbol=:symbol order by create_date asc limit 1) as last_price, + (select matched_quantity from trades where create_date > :date and symbol=:symbol order by create_date asc limit 1) as last_qty, + (select price from orders where create_date > :date and symbol=t.symbol and (status=1 or status=4) and side='BID' order by create_date desc limit 1) as bid_price, + (select price from orders where create_date > :date and symbol=t.symbol and (status=1 or status=4) and side='ASK' order by create_date asc limit 1) as ask_price, + (select price from orders where create_date > :date and symbol=t.symbol and (status=1 or status=4) order by create_date desc limit 1) as open_price, + max(taker_price) as high_price, + min(taker_price) as low_price, + sum(matched_quantity) as volume, + (select id from trades where create_date > :date and symbol=:symbol order by create_date asc limit 1) as first_id, + (select id from trades where create_date > :date and symbol=:symbol order by create_date desc limit 1) as last_id, + count(id) as count + from trades as t + where create_date > :date and symbol = :symbol + group by symbol + """ + ) + fun tradeTickerBySymbol( + @Param("symbol") + symbol: String, + @Param("date") + createDate: LocalDateTime, + ): Mono + + @Query("select * from trades where create_date in (select max(create_date) from trades group by symbol) and symbol = :symbol") + fun findBySymbolGroupBySymbol(@Param("symbol") symbol: String): Flux + + @Query("select * from trades where create_date in (select max(create_date) from trades group by symbol)") + fun findAllGroupBySymbol(): Flux + + @Query( + """ + with intervals as (select * from interval_generator((:startTime), (:endTime), :interval ::INTERVAL)) + select + f.start_time as open_time, + f.end_time as close_time, + (select taker_price from trades tt where symbol = :symbol and tt.create_date >= f.start_time and tt.create_date < f.end_time order by tt.create_date asc limit 1) as open, + max(t.taker_price) as high, + min(t.taker_price) as low, + (select taker_price from trades tt where symbol = :symbol and tt.create_date >= f.start_time and tt.create_date < f.end_time order by tt.create_date desc limit 1) as close, + sum(t.matched_quantity) as volume, + count(id) as trades + from trades t + right join intervals f + on t.create_date >= f.start_time and t.create_date < f.end_time + where symbol = :symbol or symbol is null + group by f.start_time, f.end_time + order by f.end_time desc + limit :limit + """ + ) + suspend fun candleData( + @Param("symbol") + symbol: String, + @Param("interval") + interval: String, + @Param("startTime") + startTime: LocalDateTime, + @Param("endTime") + endTime: LocalDateTime, + @Param("limit") + limit: Int, + ): Flux + + @Query("select * from trades order by create_date desc limit 1") + suspend fun findLastByCreateDate(): Mono + + @Query("select * from trades order by create_date asc limit 1") + suspend fun findFirstByCreateDate(): Mono +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/impl/MarketQueryHandlerImpl.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/impl/MarketQueryHandlerImpl.kt new file mode 100644 index 000000000..eb676717a --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/impl/MarketQueryHandlerImpl.kt @@ -0,0 +1,209 @@ +package co.nilin.opex.port.websocket.postgres.impl + +import co.nilin.opex.websocket.core.inout.* +import co.nilin.opex.websocket.core.spi.MarketQueryHandler +import co.nilin.opex.websocket.core.spi.SymbolMapper +import co.nilin.opex.matching.core.model.OrderDirection +import co.nilin.opex.port.websocket.postgres.dao.OrderRepository +import co.nilin.opex.port.websocket.postgres.dao.TradeRepository +import co.nilin.opex.port.websocket.postgres.model.OrderModel +import co.nilin.opex.port.websocket.postgres.model.TradeTickerData +import co.nilin.opex.port.websocket.postgres.util.* +import co.nilin.opex.port.websocket.postgres.util.toWebSocketOrderType +import co.nilin.opex.port.websocket.postgres.util.toOrderSide +import co.nilin.opex.port.websocket.postgres.util.toOrderStatus +import co.nilin.opex.port.websocket.postgres.util.toTimeInForce +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrElse +import kotlinx.coroutines.reactive.awaitFirstOrNull +import org.springframework.stereotype.Component +import java.lang.Exception +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneId +import java.time.ZoneOffset +import java.util.* +import kotlin.math.max +import kotlin.math.min + +@Component +class MarketQueryHandlerImpl( + private val orderRepository: OrderRepository, + private val tradeRepository: TradeRepository, + private val symbolMapper: SymbolMapper, +) : MarketQueryHandler { + + override suspend fun getTradeTickerDataBySymbol(symbol: String, startFrom: LocalDateTime): PriceChangeResponse { + return tradeRepository.tradeTickerBySymbol(symbol, startFrom) + .awaitFirstOrNull() + ?.asPriceChangeResponse(Date().time, startFrom.toInstant(ZoneOffset.UTC).toEpochMilli()) + ?: PriceChangeResponse( + symbol = symbol, + openTime = Date().time, + closeTime = startFrom.toInstant(ZoneOffset.UTC).toEpochMilli() + ) + } + + override suspend fun openBidOrders(symbol: String, limit: Int): List { + return orderRepository.findBySymbolAndDirectionAndStatusSortDescendingByPrice( + symbol, + OrderDirection.BID, + limit, + listOf(OrderStatus.NEW.code, OrderStatus.PARTIALLY_FILLED.code) + ).collectList() + .awaitFirstOrElse { emptyList() } + .map { OrderBookResponse(it.price?.toBigDecimal(), it.quantity?.toBigDecimal()) } + } + + override suspend fun openAskOrders(symbol: String, limit: Int): List { + return orderRepository.findBySymbolAndDirectionAndStatusSortAscendingByPrice( + symbol, + OrderDirection.ASK, + limit, + listOf(OrderStatus.NEW.code, OrderStatus.PARTIALLY_FILLED.code) + ).collectList() + .awaitFirstOrElse { emptyList() } + .map { OrderBookResponse(it.price?.toBigDecimal(), it.quantity?.toBigDecimal()) } + } + + override suspend fun lastOrder(symbol: String): QueryOrderResponse? { + return orderRepository.findLastOrderBySymbol(symbol) + .awaitFirstOrNull() + ?.asQueryOrderResponse() + } + + override suspend fun recentTrades(symbol: String, limit: Int): Flow { + return tradeRepository.findBySymbolSortDescendingByCreateDate(symbol, limit) + .map { + val takerOrder = orderRepository.findByOuid(it.takerOuid).awaitFirst() + val makerOrder = orderRepository.findByOuid(it.makerOuid).awaitFirst() + val isMakerBuyer = makerOrder.direction == OrderDirection.BID + MarketTradeResponse( + it.symbol, + it.tradeId, + if (isMakerBuyer) it.makerPrice.toBigDecimal() else it.takerPrice.toBigDecimal(), + it.matchedQuantity.toBigDecimal(), + if (isMakerBuyer) + makerOrder.quoteQuantity!!.toBigDecimal() + else + takerOrder.quoteQuantity!!.toBigDecimal(), + Date.from(it.createDate.atZone(ZoneId.systemDefault()).toInstant()), + true, + isMakerBuyer + ) + } + } + + override suspend fun lastPrice(symbol: String?): List { + val list = if (symbol.isNullOrEmpty()) + tradeRepository.findAllGroupBySymbol() + else + tradeRepository.findBySymbolGroupBySymbol(symbol) + return list.collectList() + .awaitFirstOrElse { emptyList() } + .map { + val makerOrder = orderRepository.findByOuid(it.makerOuid).awaitFirst() + val websocketSymbol = try { + symbolMapper.map(it.symbol) + } catch (e: Exception) { + it.symbol + } + val isMakerBuyer = makerOrder.direction == OrderDirection.BID + PriceTickerResponse( + websocketSymbol, + if (isMakerBuyer) + min(it.takerPrice, it.makerPrice).toString() + else + max(it.takerPrice, it.makerPrice).toString() + ) + } + + } + + override suspend fun getCandleInfo( + symbol: String, + interval: String, + startTime: Long?, + endTime: Long?, + limit: Int + ): List { + val st = if (startTime == null) + tradeRepository.findFirstByCreateDate().awaitFirstOrNull()?.createDate + ?: LocalDateTime.now() + else + with(Instant.ofEpochMilli(startTime)) { + LocalDateTime.ofInstant(this, ZoneId.systemDefault()) + } + + val et = if (endTime == null) + tradeRepository.findLastByCreateDate().awaitFirstOrNull()?.createDate + ?: LocalDateTime.now() + else + with(Instant.ofEpochMilli(endTime)) { + LocalDateTime.ofInstant(this, ZoneId.systemDefault()) + } + + return tradeRepository.candleData(symbol, interval, st, et, limit) + .collectList() + .awaitFirstOrElse { emptyList() } + .map { + CandleData( + it.openTime, + it.closeTime, + it.open ?: 0.0, + it.close ?: 0.0, + it.high ?: 0.0, + it.low ?: 0.0, + it.volume ?: 0.0, + 0.0, + it.trades, + 0.0, + 0.0 + ) + } + } + + private fun OrderModel.asQueryOrderResponse() = QueryOrderResponse( + symbol, + ouid, + orderId ?: -1, + -1, + clientOrderId ?: "", + price!!.toBigDecimal(), + quantity!!.toBigDecimal(), + executedQuantity!!.toBigDecimal(), + (accumulativeQuoteQty ?: 0.0).toBigDecimal(), + status!!.toOrderStatus(), + constraint!!.toTimeInForce(), + type!!.toWebSocketOrderType(), + direction!!.toOrderSide(), + null, + null, + Date.from(createDate!!.atZone(ZoneId.systemDefault()).toInstant()), + Date.from(updateDate.atZone(ZoneId.systemDefault()).toInstant()), + status.toOrderStatus().isWorking(), + quoteQuantity!!.toBigDecimal() + ) + + private fun TradeTickerData.asPriceChangeResponse(openTime: Long, closeTime: Long) = PriceChangeResponse( + symbol, + priceChange ?: 0.0, + priceChangePercent ?: 0.0, + weightedAvgPrice ?: 0.0, + lastPrice ?: 0.0, + lastQty ?: 0.0, + bidPrice ?: 0.0, + askPrice ?: 0.0, + openPrice ?: 0.0, + highPrice ?: 0.0, + lowPrice ?: 0.0, + volume ?: 0.0, + openTime, + closeTime, + firstId ?: -1, + lastId ?: -1, + count ?: 0 + ) +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/impl/SymbolMapperImpl.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/impl/SymbolMapperImpl.kt new file mode 100644 index 000000000..e29132356 --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/impl/SymbolMapperImpl.kt @@ -0,0 +1,30 @@ +package co.nilin.opex.port.websocket.postgres.impl + +import co.nilin.opex.websocket.core.spi.SymbolMapper +import co.nilin.opex.port.websocket.postgres.dao.SymbolMapRepository +import kotlinx.coroutines.reactive.awaitFirstOrElse +import kotlinx.coroutines.reactive.awaitFirstOrNull +import org.springframework.stereotype.Component + +@Component +class SymbolMapperImpl(val symbolMapRepository: SymbolMapRepository) : SymbolMapper { + + override suspend fun map(symbol: String?): String? { + if (symbol == null) return null + return symbolMapRepository.findBySymbol(symbol).awaitFirstOrNull()?.value + } + + override suspend fun unmap(value: String?): String? { + if (value == null) return null + return symbolMapRepository.findByValue(value).awaitFirstOrNull()?.symbol + } + + override suspend fun getKeyValues(): Map { + val map = HashMap() + symbolMapRepository.findAll() + .collectList() + .awaitFirstOrElse { emptyList() } + .forEach { map[it.symbol] = it.value } + return map + } +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/impl/UserQueryHandlerImpl.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/impl/UserQueryHandlerImpl.kt new file mode 100644 index 000000000..2e989bcf0 --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/impl/UserQueryHandlerImpl.kt @@ -0,0 +1,139 @@ +package co.nilin.opex.port.websocket.postgres.impl + +import co.nilin.opex.websocket.core.inout.* +import co.nilin.opex.websocket.core.spi.UserQueryHandler +import co.nilin.opex.matching.core.model.OrderDirection +import co.nilin.opex.port.websocket.postgres.dao.OrderRepository +import co.nilin.opex.port.websocket.postgres.dao.TradeRepository +import co.nilin.opex.port.websocket.postgres.model.OrderModel +import co.nilin.opex.port.websocket.postgres.util.* +import co.nilin.opex.port.websocket.postgres.util.toWebSocketOrderType +import co.nilin.opex.port.websocket.postgres.util.toOrderSide +import co.nilin.opex.port.websocket.postgres.util.toOrderStatus +import co.nilin.opex.port.websocket.postgres.util.toTimeInForce +import co.nilin.opex.utility.error.data.OpexError +import co.nilin.opex.utility.error.data.OpexException +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.reactive.awaitFirst +import kotlinx.coroutines.reactive.awaitFirstOrNull +import org.springframework.stereotype.Component +import java.security.Principal +import java.time.ZoneId +import java.util.* + +@Component +class UserQueryHandlerImpl( + val orderRepository: OrderRepository, + val tradeRepository: TradeRepository +) : UserQueryHandler { + + override suspend fun queryOrder(principal: Principal, request: QueryOrderRequest): QueryOrderResponse? { + val order = (if (request.origClientOrderId != null) { + orderRepository.findBySymbolAndClientOrderId(request.symbol, request.origClientOrderId!!) + } else { + orderRepository.findBySymbolAndOrderId(request.symbol, request.orderId!!) + + }).awaitFirstOrNull() + if (order?.constraint != null) { + if (order.uuid != principal.name) + throw OpexException(OpexError.Forbidden) + return orderToQueryResponse(order) + } + return null + } + + override suspend fun openOrders(principal: Principal, symbol: String?): Flow { + return orderRepository.findByUuidAndSymbolAndStatus( + principal.name, + symbol, + listOf(OrderStatus.NEW.code, OrderStatus.PARTIALLY_FILLED.code) + ).filter { orderModel -> orderModel.constraint != null } + .map { order -> orderToQueryResponse(order) } + } + + override suspend fun allOrders(principal: Principal, allOrderRequest: AllOrderRequest): Flow { + return orderRepository.findByUuidAndSymbolAndTimeBetween( + principal.name, + allOrderRequest.symbol, + allOrderRequest.startTime, + allOrderRequest.endTime + ).filter { orderModel -> orderModel.constraint != null } + .map { order -> orderToQueryResponse(order) } + } + + override suspend fun allTrades(principal: Principal, request: TradeRequest): Flow { + return tradeRepository.findByUuidAndSymbolAndTimeBetweenAndTradeIdGreaterThan( + principal.name, request.symbol, request.fromTrade, request.startTime, request.endTime + ).map { trade -> + val takerOrder = orderRepository.findByOuid(trade.takerOuid).awaitFirst() + val makerOrder = orderRepository.findByOuid(trade.makerOuid).awaitFirst() + val isMakerBuyer = makerOrder.direction == OrderDirection.BID + TradeResponse( + trade.symbol, + trade.tradeId, + if (trade.takerUuid == principal.name) { + takerOrder.orderId!! + } else { + makerOrder.orderId!! + }, + -1, + if (trade.takerUuid == principal.name) { + trade.takerPrice.toBigDecimal() + } else { + trade.makerPrice.toBigDecimal() + }, + trade.matchedQuantity.toBigDecimal(), + if (isMakerBuyer) { + makerOrder.quoteQuantity!!.toBigDecimal() + } else { + takerOrder.quoteQuantity!!.toBigDecimal() + }, + if (trade.takerUuid == principal.name) { + trade.takerCommision!!.toBigDecimal() + } else { + trade.makerCommision!!.toBigDecimal() + }, + if (trade.takerUuid == principal.name) { + trade.takerCommisionAsset!! + } else { + trade.makerCommisionAsset!! + }, + Date.from( + trade.createDate.atZone(ZoneId.systemDefault()).toInstant() + ), + if (trade.takerUuid == principal.name) { + OrderDirection.ASK == takerOrder.direction + } else { + OrderDirection.ASK == makerOrder.direction + }, + trade.makerUuid == principal.name, + true, + isMakerBuyer + ) + } + } + + + private fun orderToQueryResponse(order: OrderModel) = QueryOrderResponse( + order.symbol, + order.ouid, + order.orderId ?: -1, + -1, + order.clientOrderId ?: "", + order.price!!.toBigDecimal(), + order.quantity!!.toBigDecimal(), + order.executedQuantity!!.toBigDecimal(), + (order.accumulativeQuoteQty ?: 0.0).toBigDecimal(), + order.status!!.toOrderStatus(), + order.constraint!!.toTimeInForce(), + order.type!!.toWebSocketOrderType(), + order.direction!!.toOrderSide(), + null, + null, + Date.from(order.createDate!!.atZone(ZoneId.systemDefault()).toInstant()), + Date.from(order.updateDate.atZone(ZoneId.systemDefault()).toInstant()), + order.status.toOrderStatus().isWorking(), order.quoteQuantity!!.toBigDecimal() + ) +} \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/CandleInfoData.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/CandleInfoData.kt new file mode 100644 index 000000000..4fd205c2b --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/CandleInfoData.kt @@ -0,0 +1,17 @@ +package co.nilin.opex.port.websocket.postgres.model + +import org.springframework.data.relational.core.mapping.Column +import java.time.LocalDateTime + +data class CandleInfoData( + @Column("open_time") + val openTime: LocalDateTime, + @Column("close_time") + val closeTime: LocalDateTime, + val open: Double?, + val close: Double?, + val high: Double?, + val low: Double?, + val volume: Double?, + val trades: Int, +) \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/OrderModel.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/OrderModel.kt new file mode 100644 index 000000000..20a1dad95 --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/OrderModel.kt @@ -0,0 +1,42 @@ +package co.nilin.opex.port.websocket.postgres.model + + +import co.nilin.opex.matching.core.model.MatchConstraint +import co.nilin.opex.matching.core.model.OrderDirection +import co.nilin.opex.matching.core.model.OrderType +import org.springframework.data.annotation.Id +import org.springframework.data.annotation.Version +import org.springframework.data.relational.core.mapping.Column +import org.springframework.data.relational.core.mapping.Table +import java.time.LocalDateTime + +@Table("orders") +class OrderModel( + @Id var id: Long?, + @Column(value = "ouid") + val ouid: String, + val uuid: String, + @Column(value = "client_order_id") + val clientOrderId: String?, + val symbol: String, + @Column(value = "order_id") val orderId: Long?, + @Column("maker_fee") val makerFee: Double?, + @Column("taker_fee") val takerFee: Double?, + @Column("left_side_fraction") val leftSideFraction: Double?, + @Column("right_side_fraction") val rightSideFraction: Double?, + @Column("user_level") val userLevel: String?, + @Column("side") val direction: OrderDirection?, + @Column("match_constraint") val constraint: MatchConstraint?, + @Column("order_type") val type: OrderType?, + @Column("price") val price: Double?, + @Column("quantity") val quantity: Double?, + @Column("quote_quantity") val quoteQuantity: Double?, + @Column("executed_qty") val executedQuantity: Double?, + @Column("accumulative_quote_qty") val accumulativeQuoteQty: Double?, + @Column("status") val status: Int?, + @Column("create_date") val createDate: LocalDateTime?, + @Column("update_date") val updateDate: LocalDateTime, + @Version + @Column("version") + var version: Long? = null +) \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/SymbolMapModel.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/SymbolMapModel.kt new file mode 100644 index 000000000..6c8d483bc --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/SymbolMapModel.kt @@ -0,0 +1,12 @@ +package co.nilin.opex.port.websocket.postgres.model + + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Column +import org.springframework.data.relational.core.mapping.Table + +@Table("symbol_maps") +class SymbolMapModel( + @Id val symbol: String, + @Column("value") val value: String, +) \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/TradeModel.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/TradeModel.kt new file mode 100644 index 000000000..e05a5c785 --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/TradeModel.kt @@ -0,0 +1,27 @@ +package co.nilin.opex.port.websocket.postgres.model + + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Column +import org.springframework.data.relational.core.mapping.Table +import java.time.LocalDateTime + +@Table("trades") +class TradeModel( + @Id var id: Long?, + @Column("trade_id") val tradeId: Long, + val symbol: String, + @Column("matched_quantity") val matchedQuantity: Double, + @Column("taker_price") val takerPrice: Double, + @Column("maker_price") val makerPrice: Double, + @Column("taker_commision") val takerCommision: Double?, + @Column("maker_commision") val makerCommision: Double?, + @Column("taker_commision_asset") val takerCommisionAsset: String?, + @Column("maker_commision_asset") val makerCommisionAsset: String?, + @Column("trade_date") val tradeDate: LocalDateTime, + @Column("maker_ouid") val makerOuid: String, + @Column("taker_ouid") val takerOuid: String, + @Column("maker_uuid") val makerUuid: String, + @Column("taker_uuid") val takerUuid: String, + @Column("create_date") val createDate: LocalDateTime +) \ No newline at end of file diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/TradeTickerData.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/TradeTickerData.kt new file mode 100644 index 000000000..96f094a0f --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/model/TradeTickerData.kt @@ -0,0 +1,33 @@ +package co.nilin.opex.port.websocket.postgres.model + +import org.springframework.data.relational.core.mapping.Column + +data class TradeTickerData( + val symbol: String, + @Column("price_change") + val priceChange: Double?, + @Column("price_change_percent") + val priceChangePercent: Double?, + @Column("weighted_avg_price") + val weightedAvgPrice: Double?, + @Column("last_price") + val lastPrice: Double?, + @Column("last_qty") + val lastQty: Double?, + @Column("bid_price") + val bidPrice: Double?, + @Column("ask_price") + val askPrice: Double?, + @Column("open_price") + val openPrice: Double?, + @Column("high_price") + val highPrice: Double?, + @Column("low_price") + val lowPrice: Double?, + val volume: Double?, + @Column("first_id") + val firstId: Long?, + @Column("last_id") + val lastId: Long?, + val count: Long?, +) diff --git a/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/util/EnumExtensions.kt b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/util/EnumExtensions.kt new file mode 100644 index 000000000..c84e303a0 --- /dev/null +++ b/Websocket/websocket-ports/websocket-persister-postgres/src/main/kotlin/co/nilin/opex/port/websocket/postgres/util/EnumExtensions.kt @@ -0,0 +1,45 @@ +package co.nilin.opex.port.websocket.postgres.util + +import co.nilin.opex.websocket.core.inout.OrderSide +import co.nilin.opex.websocket.core.inout.OrderStatus +import co.nilin.opex.websocket.core.inout.TimeInForce +import co.nilin.opex.matching.core.model.MatchConstraint +import co.nilin.opex.matching.core.model.OrderDirection +import co.nilin.opex.matching.core.model.OrderType + +fun MatchConstraint.toTimeInForce(): TimeInForce { + if (this == MatchConstraint.FOK_BUDGET) + return TimeInForce.FOK + if (this == MatchConstraint.IOC_BUDGET) + return TimeInForce.IOC + return TimeInForce.valueOf(this.name) +} + + +fun TimeInForce.toMatchConstraint(): MatchConstraint { + return MatchConstraint.valueOf(this.name) +} + +fun OrderType.toWebSocketOrderType(): co.nilin.opex.websocket.core.inout.OrderType { + if (this == OrderType.LIMIT_ORDER) + return co.nilin.opex.websocket.core.inout.OrderType.LIMIT + if (this == OrderType.MARKET_ORDER) + return co.nilin.opex.websocket.core.inout.OrderType.MARKET + throw IllegalArgumentException("OrderType $this is not supported!") +} + +fun OrderDirection.toOrderSide(): OrderSide { + if (this == OrderDirection.BID) + return OrderSide.BUY + return OrderSide.SELL +} + +fun OrderStatus.isWorking(): Boolean { + return listOf(OrderStatus.NEW, OrderStatus.PARTIALLY_FILLED).contains(this) +} + +fun Int.toOrderStatus(): OrderStatus { + val status = co.nilin.opex.accountant.core.inout.OrderStatus.values() + .find { s -> s.code == this } + return OrderStatus.valueOf(status!!.name) +} \ No newline at end of file diff --git a/Websocket/websocket-root.iml b/Websocket/websocket-root.iml new file mode 100644 index 000000000..4fd5057cb --- /dev/null +++ b/Websocket/websocket-root.iml @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file