Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.SecurityContext;

import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;

import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.schema.SchemaManager;
import com.baidu.hugegraph.server.RestServer;
import com.baidu.hugegraph.util.Log;
import com.codahale.metrics.annotation.Timed;
Expand Down Expand Up @@ -127,27 +124,7 @@ public void clear(@Context GraphManager manager,
throw new IllegalArgumentException(String.format(
"Please take the message: %s", CONFIRM_CLEAR));
}

// Clear vertex and edge
commit(g, () -> {
g.traversal().E().toStream().forEach(Edge::remove);
g.traversal().V().toStream().forEach(Vertex::remove);
});

// Schema operation will auto commit
SchemaManager schema = g.schema();
schema.getIndexLabels().forEach(elem -> {
schema.indexLabel(elem.name()).remove();
});
schema.getEdgeLabels().forEach(elem -> {
schema.edgeLabel(elem.name()).remove();
});
schema.getVertexLabels().forEach(elem -> {
schema.vertexLabel(elem.name()).remove();
});
schema.getPropertyKeys().forEach(elem -> {
schema.propertyKey(elem.name()).remove();
});
g.truncateBackend();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

truncate will delete system pk and vl, is it necessary?
Otherwise, counter will not be cleared

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it will delete system pk and vl.
counter should also be cleared. maybe some backends are forgotten to clear counter, but now they are all done.

}

@PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void delete(@Context GraphManager manager,
HugeGraph g = graph(manager, graph);
HugeTaskScheduler scheduler = g.taskScheduler();

HugeTask task = scheduler.task(IdGenerator.of(id));
HugeTask<?> task = scheduler.task(IdGenerator.of(id));
if (task.completed()) {
scheduler.deleteTask(IdGenerator.of(id));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public void clearBackend() {
this.hugegraph.clearBackend();
}

public void truncateBackend() {
this.verifyPermission(ROLE_ADMIN);
this.hugegraph.truncateBackend();
}

public void restoring(boolean restoring) {
this.verifyPermission(ROLE_ADMIN);
this.hugegraph.restoring(restoring);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ public void open() {
this.session = cluster().connect(keyspace());
}

public boolean opened() {
return this.session != null;
}

@Override
public boolean closed() {
if (this.session == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package com.baidu.hugegraph.backend.store.cassandra;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -242,12 +245,25 @@ public BackendFeatures features() {
@Override
public void init() {
this.checkClusterConnected();
this.initKeyspace();

if (this.sessions.session().opened()) {
// Session has ever been opened.
LOG.warn("Session has ever been opened(exist keyspace '{}' before)",
this.keyspace);
} else {
// Create keyspace if need
if (!this.existsKeyspace()) {
this.initKeyspace();
}
// Open session explicitly to get the exception when it fails
this.sessions.session().open();
}

// Create tables
this.checkSessionConnected();
this.initTables();

LOG.info("Store initialized: {}", this.store);
LOG.debug("Store initialized: {}", this.store);
}

@Override
Expand All @@ -260,7 +276,15 @@ public void clear() {
this.clearKeyspace();
}

LOG.info("Store cleared: {}", this.store);
LOG.debug("Store cleared: {}", this.store);
}

@Override
public void truncate() {
this.checkSessionConnected();

this.truncateTables();
LOG.debug("Store truncated: {}", this.store);
}

@Override
Expand Down Expand Up @@ -358,7 +382,8 @@ protected void initKeyspace() {
replication.putIfAbsent("replication_factor", factor);

Statement stmt = SchemaBuilder.createKeyspace(this.keyspace)
.ifNotExists().with().replication(replication);
.ifNotExists().with()
.replication(replication);

// Create keyspace with non-keyspace-session
LOG.debug("Create keyspace: {}", stmt);
Expand All @@ -370,7 +395,6 @@ protected void initKeyspace() {
session.close();
}
}
this.sessions.session().open();
}

protected void clearKeyspace() {
Expand All @@ -394,18 +418,29 @@ protected boolean existsKeyspace() {

protected void initTables() {
CassandraSessionPool.Session session = this.sessions.session();
for (CassandraTable table : this.tables.values()) {
for (CassandraTable table : this.tables()) {
table.init(session);
}
}

protected void clearTables() {
CassandraSessionPool.Session session = this.sessions.session();
for (CassandraTable table : this.tables.values()) {
for (CassandraTable table : this.tables()) {
table.clear(session);
}
}

protected void truncateTables() {
CassandraSessionPool.Session session = this.sessions.session();
for (CassandraTable table : this.tables()) {
table.truncate(session);
}
}

protected Collection<CassandraTable> tables() {
return this.tables.values();
}

protected final CassandraTable table(HugeType type) {
assert type != null;
CassandraTable table = this.tables.get(type);
Expand Down Expand Up @@ -464,19 +499,10 @@ public CassandraSchemaStore(BackendStoreProvider provider,
}

@Override
protected void initTables() {
super.initTables();

CassandraSessionPool.Session session = super.sessions.session();
this.counters.init(session);
}

@Override
protected void clearTables() {
super.clearTables();

CassandraSessionPool.Session session = super.sessions.session();
this.counters.clear(session);
protected Collection<CassandraTable> tables() {
List<CassandraTable> tables = new ArrayList<>(super.tables());
tables.add(this.counters);
return tables;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,4 +570,9 @@ protected void createIndex(CassandraSessionPool.Session session,
public void clear(CassandraSessionPool.Session session) {
this.dropTable(session);
}

public void truncate(CassandraSessionPool.Session session) {
LOG.debug("Truncate table: {}", this.table());
session.execute(QueryBuilder.truncate(this.table()));
}
}
2 changes: 1 addition & 1 deletion hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
<version>1.4.9</version>
<version>1.5.1</version>
</dependency>

<!-- tinkerpop -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public void clearBackend() {
}
}

public void truncateBackend() {
this.storeProvider.truncate();
new BackendStoreInfo(this).init();
}

private SchemaTransaction openSchemaTransaction() throws HugeException {
try {
return new CachedSchemaTransaction(this, this.loadSchemaStore());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ public Id nextId(HugeType type) {
}
return IdGenerator.of(counter.incrementAndGet());
}

public void reset() {
this.counters.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public void clear() {
this.store.clear();
}

@Override
public void truncate() {
this.store.truncate();
}

@Override
public void beginTx() {
this.store.beginTx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

Expand All @@ -33,16 +34,20 @@
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.event.EventHub;
import com.baidu.hugegraph.event.EventListener;
import com.baidu.hugegraph.schema.SchemaElement;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.util.Events;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

public class CachedSchemaTransaction extends SchemaTransaction {

private final Cache idCache;
private final Cache nameCache;

private EventListener storeEventListener;
private EventListener cacheEventListener;

private final Map<HugeType, Boolean> cachedTypes;

public CachedSchemaTransaction(HugeGraph graph, BackendStore store) {
Expand All @@ -56,6 +61,12 @@ public CachedSchemaTransaction(HugeGraph graph, BackendStore store) {
this.listenChanges();
}

@Override
public void close() {
super.close();
this.unlistenChanges();
}

private Cache cache(String prefix) {
HugeConfig conf = super.graph().configuration();

Expand All @@ -66,47 +77,61 @@ private Cache cache(String prefix) {
}

private void listenChanges() {
// Listen store event: "store.init", "store.clear"
List<String> events = ImmutableList.of(Events.STORE_INIT,
Events.STORE_CLEAR);
super.store().provider().listen(event -> {
if (events.contains(event.name())) {
LOG.info("Clear cache on event '{}'", event.name());
// Listen store event: "store.init", "store.clear", ...
Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
Events.STORE_CLEAR,
Events.STORE_TRUNCATE);
this.storeEventListener = event -> {
if (storeEvents.contains(event.name())) {
LOG.debug("Graph {} clear cache on event '{}'",
this.graph(), event.name());
this.idCache.clear();
this.nameCache.clear();
this.cachedTypes.clear();
return true;
}
return false;
});
};
this.store().provider().listen(this.storeEventListener);

// Listen cache event: "cache"(invalid cache item)
EventHub schemaEventHub = super.graph().schemaEventHub();
if (!schemaEventHub.containsListener(Events.CACHE)) {
schemaEventHub.listen(Events.CACHE, event -> {
LOG.debug("Received event: {}", event);
event.checkArgs(String.class, Id.class);
Object[] args = event.args();
if (args[0].equals("invalid")) {
Id id = (Id) args[1];
Object value = this.idCache.get(id);
if (value != null) {
// Invalidate id cache
this.idCache.invalidate(id);

// Invalidate name cache
SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(),
schema.name());
this.nameCache.invalidate(prefixedName);
}
return true;
this.cacheEventListener = event -> {
LOG.debug("Graph {} received cache event: {}",
this.graph(), event);
event.checkArgs(String.class, Id.class);
Object[] args = event.args();
if (args[0].equals("invalid")) {
Id id = (Id) args[1];
Object value = this.idCache.get(id);
if (value != null) {
// Invalidate id cache
this.idCache.invalidate(id);

// Invalidate name cache
SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(),
schema.name());
this.nameCache.invalidate(prefixedName);
}
return false;
});
return true;
}
return false;
};
EventHub schemaEventHub = this.graph().schemaEventHub();
if (!schemaEventHub.containsListener(Events.CACHE)) {
schemaEventHub.listen(Events.CACHE, this.cacheEventListener);
}
}

private void unlistenChanges() {
// Unlisten store event
this.store().provider().unlisten(this.storeEventListener);

// Unlisten cache event
EventHub schemaEventHub = this.graph().schemaEventHub();
schemaEventHub.unlisten(Events.CACHE, this.cacheEventListener);
}

private void resetCachedAllIfReachedCapacity() {
if (this.idCache.size() >= this.idCache.capacity()) {
LOG.warn("Schema cache reached capacity({}): {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ private BinaryBackendEntry formatILDeletion(HugeIndex index) {
case SEARCH_INDEX:
String idString = id.asString();
int idLength = idString.length();
// TODO: to improve, use BytesBuffer to generate a mask
for (int i = idLength - 1; i < 128; i++) {
BytesBuffer buffer = BytesBuffer.allocate(idLength + 1);
/*
Expand Down
Loading