Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public interface RevokingDatabase {

void setCursor(Chainbase.Cursor cursor, long offset);

void setSpecifiedCursor(Long specifiedSnapshotVersion);

void add(IRevokingDB revokingDB);

void merge() throws RevokingStoreIllegalStateException;
Expand All @@ -23,6 +25,8 @@ public interface RevokingDatabase {

void commit() throws RevokingStoreIllegalStateException;

boolean hasCommitted();

void pop() throws RevokingStoreIllegalStateException;

void fastPop() throws RevokingStoreIllegalStateException;
Expand Down
11 changes: 11 additions & 0 deletions chainbase/src/main/java/org/tron/core/db2/common/IRevokingDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.Set;
import org.tron.core.db2.core.Chainbase;
import org.tron.core.db2.core.Snapshot;
import org.tron.core.exception.ItemNotFoundException;

public interface IRevokingDB extends Iterable<Map.Entry<byte[], byte[]>> {
Expand All @@ -29,6 +30,12 @@ public interface IRevokingDB extends Iterable<Map.Entry<byte[], byte[]>> {

void setCursor(Chainbase.Cursor cursor, long offset);

void setSpecifiedSnapshotVersion(Long specifiedSnapshotVersion);

void onSnapshotAdd(long snapshotVersion, Snapshot snapshot);

Snapshot onSnapshotRemove(long snapshotVersion);

Chainbase.Cursor getCursor();

// for blockstore
Expand All @@ -45,4 +52,8 @@ default Map<byte[], byte[]> getNext(byte[] key, long limit) {
return Collections.emptyMap();
}

Snapshot findSnapshot(byte[] key, byte[] value);

void printStats();

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ public abstract class AbstractSnapshot<K, V> implements Snapshot {

protected boolean isOptimized;

@Getter
@Setter
protected long snapVersion; // snapshot version

@Override
public Snapshot advance() {
return new SnapshotImpl(this);
public Snapshot advance(long newSnapVersion) {
return new SnapshotImpl(this, newSnapVersion);
}

@Override
Expand Down
98 changes: 81 additions & 17 deletions chainbase/src/main/java/org/tron/core/db2/core/Chainbase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,43 @@

import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import java.util.*;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import org.tron.common.utils.ByteUtil;
import org.tron.common.utils.Pair;
import org.tron.core.capsule.utils.MarketUtils;
import org.tron.core.db2.common.IRevokingDB;
import org.tron.core.db2.common.LevelDB;
import org.tron.core.db2.common.RocksDB;
import org.tron.core.db2.common.Value;
import org.tron.core.db2.common.*;
import org.tron.core.db2.common.Value.Operator;
import org.tron.core.db2.common.WrappedByteArray;
import org.tron.core.exception.ItemNotFoundException;


@Slf4j
public class Chainbase implements IRevokingDB {

// public static Map<String, byte[]> assetsAddress = new HashMap<>(); // key = name , value = address
public enum Cursor {
HEAD,
SOLIDITY,
PBFT
PBFT,
// add a specified block number cursor
SPECIFIED,
}

//true:fullnode, false:soliditynode
private ThreadLocal<Cursor> cursor = new ThreadLocal<>();
private ThreadLocal<Long> offset = new ThreadLocal<>();
private ThreadLocal<Long> specifiedSnapshotVersion = new ThreadLocal<>();
private ThreadLocal<Snapshot> specifiedSnapshot = new ThreadLocal<>();
private Snapshot head;
private Map<Long, Snapshot> snapshotSet = new HashMap<>();

public Chainbase(Snapshot head) {
this.head = head;
cursor.set(Cursor.HEAD);
offset.set(0L);
onSnapshotAdd(head.getSnapVersion(), head);
}

public String getDbName() {
Expand All @@ -58,6 +56,34 @@ public void setCursor(Cursor cursor, long offset) {
this.offset.set(offset);
}

@Override
public void setSpecifiedSnapshotVersion(Long specifiedSnapshotVersion) {
this.cursor.set(Cursor.SPECIFIED);
this.specifiedSnapshotVersion.set(specifiedSnapshotVersion);
if (specifiedSnapshotVersion == null) {
return;
}
Snapshot tmp = head;

while (tmp != null && tmp != tmp.getRoot()) {
if (tmp.getSnapVersion() == specifiedSnapshotVersion) {
this.specifiedSnapshot.set(tmp);
return;
}
tmp = tmp.getPrevious();
}
}

@Override
public void onSnapshotAdd(long snapshotVersion, Snapshot snapshot) {
snapshotSet.put(snapshotVersion, snapshot);
}

@Override
public Snapshot onSnapshotRemove(long snapshotVersion) {
return snapshotSet.remove(snapshotVersion);
}

@Override
public Cursor getCursor() {
if (cursor.get() == null) {
Expand Down Expand Up @@ -91,6 +117,12 @@ private Snapshot head() {
} else {
return head.getSolidity();
}
case SPECIFIED:
Snapshot curSnapshot = specifiedSnapshot.get();
if (curSnapshot != null) {
return curSnapshot;
}
return head;
default:
return head;
}
Expand Down Expand Up @@ -351,7 +383,7 @@ private Map<byte[], byte[]> getNext(Snapshot head, byte[] key, long limit) {

public Map<WrappedByteArray, byte[]> prefixQuery(byte[] key) {
Map<WrappedByteArray, byte[]> result = prefixQueryRoot(key);
Map<WrappedByteArray, byte[]> snapshot = prefixQuerySnapshot(key);
Map<WrappedByteArray, byte[]> snapshot = prefixQuerySnapshot(key);
result.putAll(snapshot);
result.entrySet().removeIf(e -> e.getValue() == null);
return result;
Expand All @@ -372,10 +404,42 @@ private Map<WrappedByteArray, byte[]> prefixQuerySnapshot(byte[] key) {
Snapshot snapshot = head();
if (!snapshot.equals(head.getRoot())) {
Map<WrappedByteArray, WrappedByteArray> all = new HashMap<>();
((SnapshotImpl) snapshot).collect(all, key);
((SnapshotImpl) snapshot).collect(all, key, snapshot);
all.forEach((k, v) -> result.put(k, v.getBytes()));
}
return result;
}


public Snapshot findSnapshot(byte[] key, byte[] value) {
Snapshot cur = head;
Value tmpValue;
while (Snapshot.isImpl(cur)) {
if ((tmpValue = ((SnapshotImpl) cur).db.get(Key.of(key))) != null
&& Arrays.equals(tmpValue.getBytes(), value)) {
return cur;
}
cur = cur.getPrevious();
}
return Snapshot.isImpl(cur) ? cur : null;
}

@Override
public void printStats() {
logger.info("[chainbase]snapshotSet size: {}", snapshotSet.size());
StringBuilder stringBuilder = new StringBuilder();
Snapshot tmp = head;
stringBuilder.append("[chainbase]version list: [");
while (tmp != null && tmp != tmp.getRoot()) {
stringBuilder.append(tmp.getSnapVersion()).append(" -> ");
tmp = tmp.getPrevious();
}
if (tmp != null) {
stringBuilder.append(tmp.getSnapVersion());
}
stringBuilder.append("]");
logger.info("[chainbase]db name={}, {}", getDbName(), stringBuilder.toString());

}

}
7 changes: 6 additions & 1 deletion chainbase/src/main/java/org/tron/core/db2/core/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ static boolean isRoot(Snapshot snapshot) {
static boolean isImpl(Snapshot snapshot) {
return snapshot != null && snapshot.getClass() == SnapshotImpl.class;
}
long getSnapVersion();

byte[] get(byte[] key);

Expand All @@ -21,7 +22,7 @@ static boolean isImpl(Snapshot snapshot) {

void merge(Snapshot from);

Snapshot advance();
Snapshot advance(long newSnapVersion);

Snapshot retreat();

Expand Down Expand Up @@ -50,4 +51,8 @@ static boolean isImpl(Snapshot snapshot) {
boolean isOptimized();

void reloadToMem();

void setCommitted();

boolean isCommitted();
}
47 changes: 30 additions & 17 deletions chainbase/src/main/java/org/tron/core/db2/core/SnapshotImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import com.google.common.primitives.Bytes;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import java.util.*;

import lombok.Getter;
import org.tron.core.db2.common.HashDB;
import org.tron.core.db2.common.Key;
Expand All @@ -23,7 +20,9 @@ public class SnapshotImpl extends AbstractSnapshot<Key, Value> {
@Getter
protected Snapshot root;

SnapshotImpl(Snapshot snapshot) {
private boolean isCommitted;

SnapshotImpl(Snapshot snapshot, long newSnapVersion) {
root = snapshot.getRoot();
synchronized (this) {
db = new HashDB(SnapshotImpl.class.getSimpleName() + ":" + root.getDbName());
Expand All @@ -34,6 +33,7 @@ public class SnapshotImpl extends AbstractSnapshot<Key, Value> {
if (isOptimized && root == previous) {
Streams.stream(root.iterator()).forEach( e -> put(e.getKey(),e.getValue()));
}
snapVersion = newSnapVersion;
}

@Override
Expand Down Expand Up @@ -86,18 +86,19 @@ public void remove(byte[] key) {
public void merge(Snapshot from) {
SnapshotImpl fromImpl = (SnapshotImpl) from;
Streams.stream(fromImpl.db).forEach(e -> db.put(e.getKey(), e.getValue()));
snapVersion = from.getSnapVersion();
}

public void mergeAhead(Snapshot from) {
if (from instanceof SnapshotRoot) {
return ;
return;
}
SnapshotImpl fromImpl = (SnapshotImpl) from;
Streams.stream(fromImpl.db).forEach(e -> {
if (db.get(e.getKey()) == null) {
db.put(e.getKey(), e.getValue());
}
}
if (db.get(e.getKey()) == null) {
db.put(e.getKey(), e.getValue());
}
}
);
}

Expand Down Expand Up @@ -135,13 +136,16 @@ synchronized void collect(Map<WrappedByteArray, WrappedByteArray> all) {
}
}

synchronized void collect(Map<WrappedByteArray, WrappedByteArray> all, byte[] prefix) {
synchronized void collect(Map<WrappedByteArray, WrappedByteArray> all, byte[] prefix, Snapshot head) {
Snapshot next = getRoot().getNext();
while (next != null) {
Streams.stream(((SnapshotImpl) next).db).filter(e -> Bytes.indexOf(
Objects.requireNonNull(e.getKey().getBytes()), prefix) == 0)
.forEach(e -> all.put(WrappedByteArray.of(e.getKey().getBytes()),
WrappedByteArray.of(e.getValue().getBytes())));
if (next == head) {
break;
}
next = next.getNext();
}
}
Expand All @@ -153,7 +157,7 @@ synchronized void collect(Map<WrappedByteArray, WrappedByteArray> all, byte[] pr
* So, if we use list, we need to exclude duplicate keys.
* More than that, there will be some item which has been deleted, but just assigned in Operator,
* so we need Operator value to determine next step.
* */
*/
synchronized void collectUnique(Map<WrappedByteArray, Operator> all) {
Snapshot next = getRoot().getNext();
while (next != null) {
Expand All @@ -165,7 +169,6 @@ synchronized void collectUnique(Map<WrappedByteArray, Operator> all) {
}



@Override
public void close() {
getRoot().close();
Expand Down Expand Up @@ -193,11 +196,21 @@ public String getDbName() {

@Override
public Snapshot newInstance() {
return new SnapshotImpl(this);
return new SnapshotImpl(this, this.getSnapVersion());
}

@Override
public void reloadToMem() {
mergeAhead(previous);
mergeAhead(previous);
}

@Override
public void setCommitted() {
isCommitted = true;
}

@Override
public boolean isCommitted() {
return isCommitted;
}
}
Loading