Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 76 additions & 64 deletions hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,8 @@ public void close() throws HugeException {
}
// Make sure that all transactions are closed in all threads
E.checkState(this.tx.closed(),
"Ensure tx closed in all threads when closing graph");
"Ensure tx closed in all threads when closing graph '%s'",
this.name);
}

public void closeTx() {
Expand Down Expand Up @@ -590,27 +591,25 @@ public static void shutdown(long timeout) {

private class TinkerpopTransaction extends AbstractThreadLocalTransaction {

private AtomicInteger refs;

private ThreadLocal<Boolean> opened;

// Times opened from upper layer
private final AtomicInteger refs;
// Flag opened of each thread
private final ThreadLocal<Boolean> opened;
// Backend transactions
private ThreadLocal<GraphTransaction> graphTransaction;
private ThreadLocal<SchemaTransaction> schemaTransaction;
private final ThreadLocal<Txs> transactions;

public TinkerpopTransaction(Graph graph) {
super(graph);

this.refs = new AtomicInteger(0);

this.refs = new AtomicInteger();
this.opened = ThreadLocal.withInitial(() -> false);
this.graphTransaction = ThreadLocal.withInitial(() -> null);
this.schemaTransaction = ThreadLocal.withInitial(() -> null);
this.transactions = ThreadLocal.withInitial(() -> null);
}

public boolean closed() {
assert this.refs.get() >= 0 : this.refs.get();
return this.refs.get() == 0;
int refs = this.refs.get();
assert refs >= 0 : refs;
return refs == 0;
}

/**
Expand Down Expand Up @@ -653,37 +652,28 @@ public boolean isOpen() {

@Override
protected void doOpen() {
this.schemaTransaction();
this.graphTransaction();

this.getOrNewTransaction();
this.setOpened();
}

@Override
protected void doCommit() {
this.verifyOpened();

this.schemaTransaction().commit();
this.graphTransaction().commit();
this.getOrNewTransaction().commit();
}

@Override
protected void doRollback() {
this.verifyOpened();

try {
this.graphTransaction().rollback();
} finally {
this.schemaTransaction().rollback();
}
this.getOrNewTransaction().rollback();
}

@Override
protected void doClose() {
this.verifyOpened();

try {
// Calling super will clear listeners
// Calling super.doClose() will clear listeners
super.doClose();
} finally {
this.resetState();
Expand All @@ -692,11 +682,8 @@ protected void doClose() {

@Override
public String toString() {
return String.format("TinkerpopTransaction{opened=%s, " +
"graphTx=%s, schemaTx=%s}",
this.opened.get(),
this.graphTransaction.get(),
this.schemaTransaction.get());
return String.format("TinkerpopTransaction{opened=%s, txs=%s}",
this.opened.get(), this.transactions.get());
}

private void verifyOpened() {
Expand Down Expand Up @@ -727,33 +714,27 @@ private void setClosed() {
}

private SchemaTransaction schemaTransaction() {
/*
* NOTE: this method may be called even tx is not opened,
* the reason is for reusing backend tx.
* so we don't call this.verifyOpened() here.
*/

SchemaTransaction schemaTx = this.schemaTransaction.get();
if (schemaTx == null) {
schemaTx = openSchemaTransaction();
this.schemaTransaction.set(schemaTx);
}
return schemaTx;
return this.getOrNewTransaction().schemaTx;
}

private GraphTransaction graphTransaction() {
return this.getOrNewTransaction().graphTx;
}

private Txs getOrNewTransaction() {
/*
* NOTE: this method may be called even tx is not opened,
* the reason is for reusing backend tx.
* so we don't call this.verifyOpened() here.
*/

GraphTransaction graphTx = this.graphTransaction.get();
if (graphTx == null) {
graphTx = openGraphTransaction();
this.graphTransaction.set(graphTx);
Txs txs = this.transactions.get();
if (txs == null) {
// TODO: close SchemaTransaction if GraphTransaction is error
txs = new Txs(openSchemaTransaction(), openGraphTransaction());
this.transactions.set(txs);
}
return graphTx;
return txs;
}

private void destroyTransaction() {
Expand All @@ -762,26 +743,57 @@ private void destroyTransaction() {
"Transaction should be closed before destroying");
}

GraphTransaction graphTx = this.graphTransaction.get();
if (graphTx != null) {
try {
graphTx.close();
} catch (Exception e) {
LOG.error("Failed to close GraphTransaction", e);
}
// Do close if needed, then remove the reference
Txs txs = this.transactions.get();
if (txs != null) {
txs.close();
}
this.transactions.remove();
}
}

private static final class Txs {

public final SchemaTransaction schemaTx;
public final GraphTransaction graphTx;

public Txs(SchemaTransaction schemaTx, GraphTransaction graphTx) {
assert schemaTx != null && graphTx != null;
this.schemaTx = schemaTx;
this.graphTx = graphTx;
}

public void commit() {
this.schemaTx.commit();
this.graphTx.commit();
}

public void rollback() {
try {
this.schemaTx.rollback();
} finally {
this.graphTx.rollback();
}
}

public void close() {
try {
this.graphTx.close();
} catch (Exception e) {
LOG.error("Failed to close GraphTransaction", e);
}

SchemaTransaction schemaTx = this.schemaTransaction.get();
if (schemaTx != null) {
try {
schemaTx.close();
} catch (Exception e) {
LOG.error("Failed to close SchemaTransaction", e);
}
try {
this.schemaTx.close();
} catch (Exception e) {
LOG.error("Failed to close SchemaTransaction", e);
}
}

this.graphTransaction.remove();
this.schemaTransaction.remove();
@Override
public String toString() {
return String.format("{schemaTx=%s,graphTx=%s}",
this.schemaTx, this.graphTx);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ protected Statement withAfterClasses(final Statement statement) {
public void evaluate() throws Throwable {
statement.evaluate();
GraphProvider gp = GraphManager.setGraphProvider(null);
((TestGraphProvider) gp).clearBackends();
((TestGraphProvider) gp).clear();
GraphManager.setGraphProvider(gp);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected Statement withAfterClasses(final Statement statement) {
public void evaluate() throws Throwable {
statement.evaluate();
GraphProvider gp = GraphManager.setGraphProvider(null);
((TestGraphProvider) gp).clearBackends();
((TestGraphProvider) gp).clear();
GraphManager.setGraphProvider(gp);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ protected Statement withAfterClasses(final Statement statement) {
public void evaluate() throws Throwable {
statement.evaluate();
GraphProvider gp = GraphManager.setGraphProvider(null);
((TestGraphProvider) gp).clearBackends();
((TestGraphProvider) gp).clear();
GraphManager.setGraphProvider(gp);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected Statement withAfterClasses(final Statement statement) {
public void evaluate() throws Throwable {
statement.evaluate();
GraphProvider gp = GraphManager.setGraphProvider(null);
((TestGraphProvider) gp).clearBackends();
((TestGraphProvider) gp).clear();
GraphManager.setGraphProvider(gp);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package com.baidu.hugegraph.tinkerpop;

import com.baidu.hugegraph.HugeFactory;
import org.apache.commons.configuration.Configuration;

import com.baidu.hugegraph.HugeFactory;

public class TestGraphFactory {

public static TestGraph open(Configuration config) {
return new TestGraph(HugeFactory.open(config));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,13 @@ public Graph openTestGraph(final Configuration config) {
@Override
public void clear(Graph graph, Configuration config) throws Exception {
TestGraph testGraph = (TestGraph) graph;
if (testGraph == null || !testGraph.initedBackend()) {
if (testGraph == null) {
return;
}
String graphName = config.getString(CoreOptions.STORE.name());
if (!testGraph.initedBackend()) {
testGraph.close();
}
if (testGraph.closed()) {
if (this.graphs.get(graphName) == testGraph) {
this.graphs.remove(graphName);
Expand All @@ -351,10 +354,18 @@ public void clear(Graph graph, Configuration config) throws Exception {
LOG.debug("Clear graph '{}'", graphName);
}

public void clearBackends() {
public void clear() {
for (TestGraph graph : this.graphs.values()) {
graph.clearBackend();
}
for (TestGraph graph : this.graphs.values()) {
try {
graph.close();
} catch (Exception e) {
LOG.error("Error while closing graph '{}'", graph, e);
}
}
this.graphs.clear();
}

@Watched
Expand Down