diff --git a/xchange-binance/src/main/java/org/knowm/xchange/binance/dto/marketdata/BinanceBookTicker.java b/xchange-binance/src/main/java/org/knowm/xchange/binance/dto/marketdata/BinanceBookTicker.java index e8fb8f96e4..411b818076 100644 --- a/xchange-binance/src/main/java/org/knowm/xchange/binance/dto/marketdata/BinanceBookTicker.java +++ b/xchange-binance/src/main/java/org/knowm/xchange/binance/dto/marketdata/BinanceBookTicker.java @@ -2,19 +2,24 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.math.BigDecimal; +import java.util.Date; import lombok.Getter; +import lombok.Setter; import org.knowm.xchange.binance.BinanceAdapters; import org.knowm.xchange.dto.marketdata.Ticker; @Getter public final class BinanceBookTicker { + + @Setter public long updateId; private final BigDecimal bidPrice; private final BigDecimal bidQty; private final BigDecimal askPrice; private final BigDecimal askQty; private final String symbol; - + private final long eventTime; + private final long transactionTime; // The cached ticker private Ticker ticker; @@ -23,16 +28,16 @@ public BinanceBookTicker( @JsonProperty("bidQty") BigDecimal bidQty, @JsonProperty("askPrice") BigDecimal askPrice, @JsonProperty("askQty") BigDecimal askQty, - @JsonProperty("symbol") String symbol) { + @JsonProperty("symbol") String symbol, + @JsonProperty("E") long eventTime, + @JsonProperty("T") long transactionTime) { this.bidPrice = bidPrice; this.bidQty = bidQty; this.askPrice = askPrice; this.askQty = askQty; this.symbol = symbol; - } - - public void setUpdateId(long updateId) { - this.updateId = updateId; + this.eventTime = eventTime; + this.transactionTime = transactionTime; } public synchronized Ticker toTicker(boolean isFuture) { @@ -44,6 +49,7 @@ public synchronized Ticker toTicker(boolean isFuture) { .bid(bidPrice) .askSize(askQty) .bidSize(bidQty) + .timestamp(new Date(transactionTime)) .build(); } return ticker; diff --git a/xchange-core/src/main/java/org/knowm/xchange/dto/marketdata/OrderBook.java b/xchange-core/src/main/java/org/knowm/xchange/dto/marketdata/OrderBook.java index e7c8adabd3..4dc6731665 100644 --- a/xchange-core/src/main/java/org/knowm/xchange/dto/marketdata/OrderBook.java +++ b/xchange-core/src/main/java/org/knowm/xchange/dto/marketdata/OrderBook.java @@ -235,7 +235,7 @@ private boolean recheckIdx(List limitOrders, LimitOrder limitOrder, // Replace timeStamp if the provided date is non-null and in the future // TODO should this raise an exception if the order timestamp is in the past? - private void updateDate(Date updateDate) { + public void updateDate(Date updateDate) { if (updateDate != null && (timeStamp == null || updateDate.after(timeStamp))) { this.timeStamp = updateDate; diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java index 5fed0e45f9..ae2d7c953b 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.java @@ -70,6 +70,7 @@ import org.slf4j.LoggerFactory; public class BinanceStreamingMarketDataService implements StreamingMarketDataService { + private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingMarketDataService.class); @@ -99,6 +100,7 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer private Observable> allRollingWindowTickerSubscriptions; private Map fundingRateInfoMap; private Disposable fundingRateInfoUpdate; + private final Map> fundingRateInfoSubscriptions; /** * A scheduler for initialisation of binance order book snapshots, which is delegated to a @@ -141,6 +143,7 @@ public BinanceStreamingMarketDataService( this.orderBookRawUpdatesSubscriptions = new ConcurrentHashMap<>(); this.klineSubscriptions = new ConcurrentHashMap<>(); this.fundingRateInfoMap = new ConcurrentHashMap<>(); + this.fundingRateInfoSubscriptions = new ConcurrentHashMap<>(); } @Override @@ -193,26 +196,7 @@ public Observable getTrades(Instrument instrument, Object... args) { @Override public Observable getFundingRate(Instrument instrument, Object... args) { - // init update info for funding rate interval - synchronized (this) { - if (fundingRateInfoUpdate == null) { - fundingRateInfoUpdate = - Observable.interval(10, 10, TimeUnit.MINUTES).subscribe(x -> updateFundingRateInfo()); - updateFundingRateInfo(); - } - } - return service - .subscribeChannel( - channelFromCurrency(instrument, BinanceSubscriptionType.FUNDING_RATES.getType())) - .map( - it -> - this.readTransaction( - it, FUNDING_RATE_TYPE, "funding rate")) - .map(BinanceWebsocketTransaction::getData) - .filter(data -> BinanceAdapters.adaptSymbol(data.getSymbol(), true).equals(instrument)) - .map( - transaction -> - transaction.toFundingRate((fundingRateInfoMap.getOrDefault(instrument, 8)))); + return rawFundingRate(instrument); } private void updateFundingRateInfo() { @@ -400,7 +384,7 @@ public void openSubscriptions( productSubscription.getTicker().forEach(this::initTickerSubscription); productSubscription.getOrderBook().forEach(this::initRawOrderBookUpdatesSubscription); productSubscription.getTrades().forEach(this::initTradeSubscription); - productSubscription.getFundingRates().forEach(this::initTradeSubscription); + productSubscription.getFundingRates().forEach(this::initFundingRateSubscription); } private void initKlineSubscription(Instrument instrument, Set klineIntervals) { @@ -525,6 +509,41 @@ private void initRawOrderBookUpdatesSubscription(Instrument instrument) { instrument, triggerObservableBody(rawOrderBookUpdates(instrument))); } + private void initFundingRateSubscription(Instrument instrument) { + fundingRateInfoSubscriptions.put( + instrument, triggerObservableBody(rawFundingRate(instrument)).share()); + } + + private Observable rawFundingRate(Instrument instrument) { + if (!service.isLiveSubscriptionEnabled() + && !service.getProductSubscription().getFundingRates().contains(instrument)) { + throw new UpFrontSubscriptionRequiredException(); + } + return fundingRateInfoSubscriptions.computeIfAbsent( + instrument, s -> triggerObservableBody(rawFundingRateStream(instrument)).share()); + } + + private Observable rawFundingRateStream(Instrument instrument) { + // init update info for funding rate interval + synchronized (this) { + if (fundingRateInfoUpdate == null) { + fundingRateInfoUpdate = + Observable.interval(10, 10, TimeUnit.MINUTES).subscribe(x -> updateFundingRateInfo()); + updateFundingRateInfo(); + } + } + return service + .subscribeChannel( + channelFromCurrency(instrument, BinanceSubscriptionType.FUNDING_RATES.getType())) + .map( + it -> + this.readTransaction( + it, FUNDING_RATE_TYPE, "funding rate")) + .map(BinanceWebsocketTransaction::getData) + .filter(data -> BinanceAdapters.adaptSymbol(data.getSymbol(), true).equals(instrument)) + .map(transaction -> transaction.toFundingRate(fundingRateInfoMap.getOrDefault(instrument, 8))); + } + private Observable rawTickerStream(Instrument instrument) { return service .subscribeChannel(channelFromCurrency(instrument, BinanceSubscriptionType.TICKER.getType())) @@ -593,6 +612,7 @@ private Observable rawBookTickerStream(Instrument instrument) } private final class OrderbookSubscription { + final Observable stream; final AtomicLong lastUpdateId = new AtomicLong(); final AtomicLong snapshotLastUpdateId = new AtomicLong(); @@ -607,7 +627,9 @@ void invalidateSnapshot() { } void initSnapshotIfInvalid(Instrument instrument) { - if (snapshotLastUpdateId.get() != 0) return; + if (snapshotLastUpdateId.get() != 0) { + return; + } try { LOG.info("Fetching initial orderbook snapshot for {} ", instrument); onApiCall.run(); @@ -867,6 +889,7 @@ private static JavaType getKlineType() { */ @SuppressWarnings("Convert2MethodRef") private final class OrderBookFutureSubscription implements Disposable { + private final Instrument instrument; private final Observable deltasObservable; diff --git a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/dto/market/BookTickerBinanceWebSocketTransaction.java b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/dto/market/BookTickerBinanceWebSocketTransaction.java index eaeac47268..9a75036910 100644 --- a/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/dto/market/BookTickerBinanceWebSocketTransaction.java +++ b/xchange-stream-binance/src/main/java/info/bitrich/xchangestream/binance/dto/market/BookTickerBinanceWebSocketTransaction.java @@ -18,9 +18,11 @@ public BookTickerBinanceWebSocketTransaction( @JsonProperty("b") BigDecimal bidPrice, @JsonProperty("B") BigDecimal bidQty, @JsonProperty("a") BigDecimal askPrice, - @JsonProperty("A") BigDecimal askQty) { + @JsonProperty("A") BigDecimal askQty, + @JsonProperty("E") long eventTime, + @JsonProperty("T") long transactionTime) { super(BinanceWebSocketTypes.BOOK_TICKER, new Date()); - ticker = new BinanceBookTicker(bidPrice, bidQty, askPrice, askQty, symbol); + ticker = new BinanceBookTicker(bidPrice, bidQty, askPrice, askQty, symbol, eventTime, transactionTime); ticker.setUpdateId(updateId); } }