Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ private void tryOpen() {
assert this.session == null;
try {
this.open();
} catch (InvalidQueryException ignored) {}
} catch (InvalidQueryException ignored) {
// ignore
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ private Map<TokenRange, Long> getSubSplits(TokenRange tokenRange,
private Map<TokenRange, Set<Host>> getRangeMap() {
Metadata metadata = this.session.metadata();
return metadata.getTokenRanges().stream().collect(Collectors.toMap(
p -> p,
p -> metadata.getReplicas('"' + this.keyspace + '"', p)));
p -> p,
p -> metadata.getReplicas('"' + this.keyspace + '"', p)));
}

private static Map<TokenRange, Long> describeSplits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,12 +475,12 @@ protected boolean existsKeyspace() {
}

protected boolean existsTable(String table) {
KeyspaceMetadata keyspace = this.cluster().getMetadata()
KeyspaceMetadata keyspace = this.cluster().getMetadata()
.getKeyspace(this.keyspace);
if (keyspace != null && keyspace.getTable(table) != null) {
return true;
}
return false;
if (keyspace != null && keyspace.getTable(table) != null) {
return true;
}
return false;
}

protected void initKeyspace() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ public Number queryNumber(CassandraSessionPool.Session session,
statement.setReadTimeoutMillis(timeout * 1000);
return session.query(statement);
}, (q, rs) -> {
Row row = rs.one();
if (row == null) {
return IteratorUtils.of(aggregate.defaultValue());
}
return IteratorUtils.of(row.getLong(0));
});
Row row = rs.one();
if (row == null) {
return IteratorUtils.of(aggregate.defaultValue());
}
return IteratorUtils.of(row.getLong(0));
});
return aggregate.reduce(results);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private double sum(RocksDBSessions.Session session, String property) {
double total = 0;
for (RocksDBSessions db : this.dbs) {
List<String> cfValues = db.property(property);
for(String value : cfValues) {
for (String value : cfValues) {
total += Double.parseDouble(value);
}
for (String table : db.openedTables()) {
Expand All @@ -212,7 +212,7 @@ private double sum(String property) {
double total = 0;
for (RocksDBSessions db : this.dbs) {
List<String> cfValues = db.property(property);
for(String value : cfValues) {
for (String value : cfValues) {
total += Double.parseDouble(value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ public static synchronized RocksDBOptions instance() {
public static final ConfigOption<Integer> MAX_BG_JOBS =
new ConfigOption<>(
"rocksdb.max_background_jobs",
"Maximum number of concurrent background jobs, including flushes and compactions.",
"Maximum number of concurrent background jobs, " +
"including flushes and compactions.",
rangeInt(1, Integer.MAX_VALUE),
8
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ public RocksDBSessions(HugeConfig config, String database, String store) {
public abstract Set<String> openedTables();

public abstract void createTable(String... tables) throws RocksDBException;

public abstract void dropTable(String... tables) throws RocksDBException;

public abstract boolean existsTable(String table);

public abstract List<String> property(String property);

public abstract void compactRange();

public abstract RocksDBSessions copy(HugeConfig config,
Expand All @@ -67,7 +70,7 @@ public abstract String hardLinkSnapshot(String snapshotPath)
/**
* Session for RocksDB
*/
public static abstract class Session extends AbstractBackendSession {
public abstract static class Session extends AbstractBackendSession {

public static final int SCAN_ANY = 0x80;
public static final int SCAN_PREFIX_BEGIN = 0x01;
Expand All @@ -78,29 +81,40 @@ public static abstract class Session extends AbstractBackendSession {
public static final int SCAN_LTE_END = 0x30;

public abstract String dataPath();

public abstract String walPath();

public abstract String property(String table, String property);

public abstract Pair<byte[], byte[]> keyRange(String table);

public abstract void compactRange(String table);

public abstract void put(String table, byte[] key, byte[] value);

public abstract void merge(String table, byte[] key, byte[] value);

public abstract void increase(String table, byte[] key, byte[] value);

public abstract void delete(String table, byte[] key);

public abstract void deleteSingle(String table, byte[] key);

public abstract void deletePrefix(String table, byte[] key);

public abstract void deleteRange(String table,
byte[] keyFrom, byte[] keyTo);

public abstract byte[] get(String table, byte[] key);

public abstract BackendColumnIterator get(String table,
List<byte[]> keys);

public abstract BackendColumnIterator scan(String table);

public abstract BackendColumnIterator scan(String table,
byte[] prefix);

public abstract BackendColumnIterator scan(String table,
byte[] keyFrom,
byte[] keyTo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public List<String> property(String property) {
}
}
return values;
} catch(RocksDBException | UnsupportedOperationException e) {
} catch (RocksDBException | UnsupportedOperationException e) {
throw new BackendException(e);
}
}
Expand Down Expand Up @@ -728,7 +728,8 @@ public String property(String table, String property) {

@Override
public Pair<byte[], byte[]> keyRange(String table) {
byte[] startKey, endKey;
byte[] startKey;
byte[] endKey;
try (CFHandle cf = cf(table);
RocksIterator iter = rocksdb().newIterator(cf.get())) {
iter.seekToFirst();
Expand Down Expand Up @@ -1058,14 +1059,14 @@ private boolean match(int expected) {
@SuppressWarnings("unused")
private void dump() {
this.seek();
System.out.println(">>>> scan from " + this.table + ": " +
(this.keyBegin == null ? "*" :
StringEncoding.format(this.keyBegin)) +
(this.iter.isValid() ? "" : " - No data"));
LOG.info(">>>> scan from {}: {}{}",
this.table,
this.keyBegin == null ? "*" : StringEncoding.format(this.keyBegin),
this.iter.isValid() ? "" : " - No data");
for (; this.iter.isValid(); this.iter.next()) {
System.out.println(String.format("%s=%s",
StringEncoding.format(this.iter.key()),
StringEncoding.format(this.iter.value())));
LOG.info("{}={}",
StringEncoding.format(this.iter.key()),
StringEncoding.format(this.iter.value()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public long getCounter(Session session, HugeType type) {
byte[] key = new byte[]{type.code()};
byte[] value = session.get(this.table(), key);
if (value != null) {
return l(value);
return toLong(value);
} else {
return 0L;
}
Expand All @@ -59,16 +59,16 @@ public long getCounter(Session session, HugeType type) {
public void increaseCounter(Session session, HugeType type,
long increment) {
byte[] key = new byte[]{type.code()};
session.increase(this.table(), key, b(increment));
session.increase(this.table(), key, toBytes(increment));
}

private static byte[] b(long value) {
private static byte[] toBytes(long value) {
return ByteBuffer.allocate(Long.BYTES)
.order(ByteOrder.nativeOrder())
.putLong(value).array();
}

private static long l(byte[] bytes) {
private static long toLong(byte[] bytes) {
assert bytes.length == Long.BYTES;
return ByteBuffer.wrap(bytes)
.order(ByteOrder.nativeOrder())
Expand Down Expand Up @@ -282,11 +282,15 @@ protected BackendColumnIterator queryByCond(Session session,
break;
case GTE:
minEq = true;
min = (Id) r.value();
break;
case GT:
min = (Id) r.value();
break;
case LTE:
maxEq = true;
max = (Id) r.value();
break;
case LT:
max = (Id) r.value();
break;
Expand Down Expand Up @@ -323,7 +327,7 @@ public RangeIntIndex(String store) {
}
}

public static class RangeFloatIndex extends RangeIndex{
public static class RangeFloatIndex extends RangeIndex {

public static final String TABLE = HugeType.RANGE_FLOAT_INDEX.string();

Expand All @@ -341,7 +345,7 @@ public RangeLongIndex(String store) {
}
}

public static class RangeDoubleIndex extends RangeIndex{
public static class RangeDoubleIndex extends RangeIndex {

public static final String TABLE = HugeType.RANGE_DOUBLE_INDEX.string();

Expand Down
Loading