Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigDecimal;
import java.util.Date;
import lombok.Getter;
import lombok.Setter;
import org.knowm.xchange.binance.BinanceAdapters;
import org.knowm.xchange.dto.marketdata.Ticker;

@Getter
public final class BinanceBookTicker {

@Setter
public long updateId;
private final BigDecimal bidPrice;
private final BigDecimal bidQty;
private final BigDecimal askPrice;
private final BigDecimal askQty;
private final String symbol;

private final long eventTime;
private final long transactionTime;
// The cached ticker
private Ticker ticker;

Expand All @@ -23,16 +28,16 @@ public BinanceBookTicker(
@JsonProperty("bidQty") BigDecimal bidQty,
@JsonProperty("askPrice") BigDecimal askPrice,
@JsonProperty("askQty") BigDecimal askQty,
@JsonProperty("symbol") String symbol) {
@JsonProperty("symbol") String symbol,
@JsonProperty("E") long eventTime,
@JsonProperty("T") long transactionTime) {
this.bidPrice = bidPrice;
this.bidQty = bidQty;
this.askPrice = askPrice;
this.askQty = askQty;
this.symbol = symbol;
}

public void setUpdateId(long updateId) {
this.updateId = updateId;
this.eventTime = eventTime;
this.transactionTime = transactionTime;
}

public synchronized Ticker toTicker(boolean isFuture) {
Expand All @@ -44,6 +49,7 @@ public synchronized Ticker toTicker(boolean isFuture) {
.bid(bidPrice)
.askSize(askQty)
.bidSize(bidQty)
.timestamp(new Date(transactionTime))
.build();
}
return ticker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private boolean recheckIdx(List<LimitOrder> limitOrders, LimitOrder limitOrder,

// Replace timeStamp if the provided date is non-null and in the future
// TODO should this raise an exception if the order timestamp is in the past?
private void updateDate(Date updateDate) {
public void updateDate(Date updateDate) {

if (updateDate != null && (timeStamp == null || updateDate.after(timeStamp))) {
this.timeStamp = updateDate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.slf4j.LoggerFactory;

public class BinanceStreamingMarketDataService implements StreamingMarketDataService {

private static final Logger LOG =
LoggerFactory.getLogger(BinanceStreamingMarketDataService.class);

Expand Down Expand Up @@ -99,6 +100,7 @@ public class BinanceStreamingMarketDataService implements StreamingMarketDataSer
private Observable<List<BinanceTicker24h>> allRollingWindowTickerSubscriptions;
private Map<Instrument, Integer> fundingRateInfoMap;
private Disposable fundingRateInfoUpdate;
private final Map<Instrument, Observable<FundingRate>> fundingRateInfoSubscriptions;

/**
* A scheduler for initialisation of binance order book snapshots, which is delegated to a
Expand Down Expand Up @@ -141,6 +143,7 @@ public BinanceStreamingMarketDataService(
this.orderBookRawUpdatesSubscriptions = new ConcurrentHashMap<>();
this.klineSubscriptions = new ConcurrentHashMap<>();
this.fundingRateInfoMap = new ConcurrentHashMap<>();
this.fundingRateInfoSubscriptions = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -193,26 +196,7 @@ public Observable<Trade> getTrades(Instrument instrument, Object... args) {

@Override
public Observable<FundingRate> getFundingRate(Instrument instrument, Object... args) {
// init update info for funding rate interval
synchronized (this) {
if (fundingRateInfoUpdate == null) {
fundingRateInfoUpdate =
Observable.interval(10, 10, TimeUnit.MINUTES).subscribe(x -> updateFundingRateInfo());
updateFundingRateInfo();
}
}
return service
.subscribeChannel(
channelFromCurrency(instrument, BinanceSubscriptionType.FUNDING_RATES.getType()))
.map(
it ->
this.<FundingRateWebsocketTransaction>readTransaction(
it, FUNDING_RATE_TYPE, "funding rate"))
.map(BinanceWebsocketTransaction::getData)
.filter(data -> BinanceAdapters.adaptSymbol(data.getSymbol(), true).equals(instrument))
.map(
transaction ->
transaction.toFundingRate((fundingRateInfoMap.getOrDefault(instrument, 8))));
return rawFundingRate(instrument);
}

private void updateFundingRateInfo() {
Expand Down Expand Up @@ -400,7 +384,7 @@ public void openSubscriptions(
productSubscription.getTicker().forEach(this::initTickerSubscription);
productSubscription.getOrderBook().forEach(this::initRawOrderBookUpdatesSubscription);
productSubscription.getTrades().forEach(this::initTradeSubscription);
productSubscription.getFundingRates().forEach(this::initTradeSubscription);
productSubscription.getFundingRates().forEach(this::initFundingRateSubscription);
}

private void initKlineSubscription(Instrument instrument, Set<KlineInterval> klineIntervals) {
Expand Down Expand Up @@ -525,6 +509,41 @@ private void initRawOrderBookUpdatesSubscription(Instrument instrument) {
instrument, triggerObservableBody(rawOrderBookUpdates(instrument)));
}

private void initFundingRateSubscription(Instrument instrument) {
fundingRateInfoSubscriptions.put(
instrument, triggerObservableBody(rawFundingRate(instrument)).share());
}

private Observable<FundingRate> rawFundingRate(Instrument instrument) {
if (!service.isLiveSubscriptionEnabled()
&& !service.getProductSubscription().getFundingRates().contains(instrument)) {
throw new UpFrontSubscriptionRequiredException();
}
return fundingRateInfoSubscriptions.computeIfAbsent(
instrument, s -> triggerObservableBody(rawFundingRateStream(instrument)).share());
}

private Observable<FundingRate> rawFundingRateStream(Instrument instrument) {
// init update info for funding rate interval
synchronized (this) {
if (fundingRateInfoUpdate == null) {
fundingRateInfoUpdate =
Observable.interval(10, 10, TimeUnit.MINUTES).subscribe(x -> updateFundingRateInfo());
updateFundingRateInfo();
}
}
return service
.subscribeChannel(
channelFromCurrency(instrument, BinanceSubscriptionType.FUNDING_RATES.getType()))
.map(
it ->
this.<FundingRateWebsocketTransaction>readTransaction(
it, FUNDING_RATE_TYPE, "funding rate"))
.map(BinanceWebsocketTransaction::getData)
.filter(data -> BinanceAdapters.adaptSymbol(data.getSymbol(), true).equals(instrument))
.map(transaction -> transaction.toFundingRate(fundingRateInfoMap.getOrDefault(instrument, 8)));
}

private Observable<BinanceTicker24h> rawTickerStream(Instrument instrument) {
return service
.subscribeChannel(channelFromCurrency(instrument, BinanceSubscriptionType.TICKER.getType()))
Expand Down Expand Up @@ -593,6 +612,7 @@ private Observable<BinanceBookTicker> rawBookTickerStream(Instrument instrument)
}

private final class OrderbookSubscription {

final Observable<DepthBinanceWebSocketTransaction> stream;
final AtomicLong lastUpdateId = new AtomicLong();
final AtomicLong snapshotLastUpdateId = new AtomicLong();
Expand All @@ -607,7 +627,9 @@ void invalidateSnapshot() {
}

void initSnapshotIfInvalid(Instrument instrument) {
if (snapshotLastUpdateId.get() != 0) return;
if (snapshotLastUpdateId.get() != 0) {
return;
}
try {
LOG.info("Fetching initial orderbook snapshot for {} ", instrument);
onApiCall.run();
Expand Down Expand Up @@ -867,6 +889,7 @@ private static JavaType getKlineType() {
*/
@SuppressWarnings("Convert2MethodRef")
private final class OrderBookFutureSubscription implements Disposable {

private final Instrument instrument;
private final Observable<DepthBinanceWebSocketTransaction> deltasObservable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ public BookTickerBinanceWebSocketTransaction(
@JsonProperty("b") BigDecimal bidPrice,
@JsonProperty("B") BigDecimal bidQty,
@JsonProperty("a") BigDecimal askPrice,
@JsonProperty("A") BigDecimal askQty) {
@JsonProperty("A") BigDecimal askQty,
@JsonProperty("E") long eventTime,
@JsonProperty("T") long transactionTime) {
super(BinanceWebSocketTypes.BOOK_TICKER, new Date());
ticker = new BinanceBookTicker(bidPrice, bidQty, askPrice, askQty, symbol);
ticker = new BinanceBookTicker(bidPrice, bidQty, askPrice, askQty, symbol, eventTime, transactionTime);
ticker.setUpdateId(updateId);
}
}