diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java index d3efb5aa35..33c5cfa283 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java @@ -195,6 +195,12 @@ public Id addPropertyKey(PropertyKey key) { return this.hugegraph.addPropertyKey(key); } + @Override + public void updatePropertyKey(PropertyKey key) { + verifySchemaPermission(HugePermission.WRITE, key); + this.hugegraph.updatePropertyKey(key); + } + @Override public Id removePropertyKey(Id key) { PropertyKey pkey = this.hugegraph.propertyKey(key); @@ -240,6 +246,12 @@ public void addVertexLabel(VertexLabel label) { this.hugegraph.addVertexLabel(label); } + @Override + public void updateVertexLabel(VertexLabel label) { + verifySchemaPermission(HugePermission.WRITE, label); + this.hugegraph.updateVertexLabel(label); + } + @Override public Id removeVertexLabel(Id id) { VertexLabel label = this.hugegraph.vertexLabel(id); @@ -293,6 +305,12 @@ public void addEdgeLabel(EdgeLabel label) { this.hugegraph.addEdgeLabel(label); } + @Override + public void updateEdgeLabel(EdgeLabel label) { + verifySchemaPermission(HugePermission.WRITE, label); + this.hugegraph.updateEdgeLabel(label); + } + @Override public Id removeEdgeLabel(Id id) { EdgeLabel label = this.hugegraph.edgeLabel(id); @@ -339,6 +357,12 @@ public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) { this.hugegraph.addIndexLabel(schemaLabel, indexLabel); } + @Override + public void updateIndexLabel(IndexLabel label) { + verifySchemaPermission(HugePermission.WRITE, label); + this.hugegraph.updateIndexLabel(label); + } + @Override public Id removeIndexLabel(Id id) { IndexLabel label = this.hugegraph.indexLabel(id); diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java index 75bf12f0c8..147664312b 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraStore.java @@ -227,17 +227,25 @@ private void mutate(CassandraSessionPool.Session session, LOG.warn("The entry will be ignored due to no change: {}", entry); } + CassandraTable table; + if (!entry.olap()) { + // Oltp table + table = this.table(entry.type()); + } else { + if (entry.type().isIndex()) { + // Olap index + table = this.table(this.olapTableName(entry.type())); + } else { + // Olap vertex + table = this.table(this.olapTableName(entry.subId())); + } + } + switch (item.action()) { case INSERT: - // Insert olap vertex - if (entry.olap()) { - this.table(this.olapTableName(entry.subId())) - .insert(session, entry.row()); - break; - } // Insert entry if (entry.selfChanged()) { - this.table(entry.type()).insert(session, entry.row()); + table.insert(session, entry.row()); } // Insert sub rows (edges) for (CassandraBackendEntry.Row row : entry.subRows()) { @@ -245,15 +253,9 @@ private void mutate(CassandraSessionPool.Session session, } break; case DELETE: - // Delete olap vertex index by index label - if (entry.olap()) { - this.table(this.olapTableName(entry.type())) - .delete(session, entry.row()); - break; - } // Delete entry if (entry.selfChanged()) { - this.table(entry.type()).delete(session, entry.row()); + table.delete(session, entry.row()); } // Delete sub rows (edges) for (CassandraBackendEntry.Row row : entry.subRows()) { @@ -261,15 +263,9 @@ private void mutate(CassandraSessionPool.Session session, } break; case APPEND: - // Append olap vertex index - if (entry.olap()) { - this.table(this.olapTableName(entry.type())) - .append(session, entry.row()); - break; - } // Append entry if (entry.selfChanged()) { - this.table(entry.type()).append(session, entry.row()); + table.append(session, entry.row()); } // Append sub rows (edges) for (CassandraBackendEntry.Row row : entry.subRows()) { @@ -279,13 +275,27 @@ private void mutate(CassandraSessionPool.Session session, case ELIMINATE: // Eliminate entry if (entry.selfChanged()) { - this.table(entry.type()).eliminate(session, entry.row()); + table.eliminate(session, entry.row()); } // Eliminate sub rows (edges) for (CassandraBackendEntry.Row row : entry.subRows()) { this.table(row.type()).eliminate(session, row); } break; + case UPDATE_IF_PRESENT: + if (entry.selfChanged()) { + // TODO: forward to master-writer node + table.updateIfPresent(session, entry.row()); + } + assert entry.subRows().isEmpty() : entry.subRows(); + break; + case UPDATE_IF_ABSENT: + if (entry.selfChanged()) { + // TODO: forward to master-writer node + table.updateIfAbsent(session, entry.row()); + } + assert entry.subRows().isEmpty() : entry.subRows(); + break; default: throw new AssertionError(String.format( "Unsupported mutate action: %s", item.action())); diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java index 548726b4a4..f9d65fe0ff 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java @@ -38,6 +38,7 @@ import com.baidu.hugegraph.backend.query.Aggregate; import com.baidu.hugegraph.backend.query.Condition; import com.baidu.hugegraph.backend.query.Condition.Relation; +import com.baidu.hugegraph.backend.query.IdQuery; import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.backend.query.Query.Order; import com.baidu.hugegraph.backend.store.BackendEntry; @@ -46,6 +47,7 @@ import com.baidu.hugegraph.exception.NotFoundException; import com.baidu.hugegraph.exception.NotSupportException; import com.baidu.hugegraph.iterator.ExtendableIterator; +import com.baidu.hugegraph.iterator.WrappedIterator; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.util.CopyUtil; @@ -97,6 +99,18 @@ protected void registerMetaHandlers() { }); } + @Override + public boolean queryExist(CassandraSessionPool.Session session, + CassandraBackendEntry.Row entry) { + Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id()); + Iterator iter = this.query(session, query); + try { + return iter.hasNext(); + } finally { + WrappedIterator.close(iter); + } + } + @Override public Number queryNumber(CassandraSessionPool.Session session, Query query) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java index ca388899ce..18d9f795e5 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java @@ -72,6 +72,8 @@ public interface HugeGraph extends Graph { Id addPropertyKey(PropertyKey key); + void updatePropertyKey(PropertyKey key); + Id removePropertyKey(Id key); Id clearPropertyKey(PropertyKey propertyKey); @@ -84,7 +86,9 @@ public interface HugeGraph extends Graph { boolean existsPropertyKey(String key); - void addVertexLabel(VertexLabel vertexLabel); + void addVertexLabel(VertexLabel label); + + void updateVertexLabel(VertexLabel label); Id removeVertexLabel(Id label); @@ -100,7 +104,9 @@ public interface HugeGraph extends Graph { boolean existsLinkLabel(Id vertexLabel); - void addEdgeLabel(EdgeLabel edgeLabel); + void addEdgeLabel(EdgeLabel label); + + void updateEdgeLabel(EdgeLabel label); Id removeEdgeLabel(Id label); @@ -116,6 +122,8 @@ public interface HugeGraph extends Graph { void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel); + void updateIndexLabel(IndexLabel label); + Id removeIndexLabel(Id label); Id rebuildIndex(SchemaElement schema); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java index 217fc5cbba..72f8838899 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/StandardHugeGraph.java @@ -714,6 +714,12 @@ public Id addPropertyKey(PropertyKey pkey) { return this.schemaTransaction().addPropertyKey(pkey); } + @Override + public void updatePropertyKey(PropertyKey pkey) { + assert this.name.equals(pkey.graph().name()); + this.schemaTransaction().updatePropertyKey(pkey); + } + @Override public Id removePropertyKey(Id pkey) { if (this.propertyKey(pkey).olap()) { @@ -756,9 +762,15 @@ public boolean existsPropertyKey(String name) { } @Override - public void addVertexLabel(VertexLabel vertexLabel) { - assert this.name.equals(vertexLabel.graph().name()); - this.schemaTransaction().addVertexLabel(vertexLabel); + public void addVertexLabel(VertexLabel label) { + assert this.name.equals(label.graph().name()); + this.schemaTransaction().addVertexLabel(label); + } + + @Override + public void updateVertexLabel(VertexLabel label) { + assert this.name.equals(label.graph().name()); + this.schemaTransaction().updateVertexLabel(label); } @Override @@ -812,9 +824,15 @@ public boolean existsLinkLabel(Id vertexLabel) { } @Override - public void addEdgeLabel(EdgeLabel edgeLabel) { - assert this.name.equals(edgeLabel.graph().name()); - this.schemaTransaction().addEdgeLabel(edgeLabel); + public void addEdgeLabel(EdgeLabel label) { + assert this.name.equals(label.graph().name()); + this.schemaTransaction().addEdgeLabel(label); + } + + @Override + public void updateEdgeLabel(EdgeLabel label) { + assert this.name.equals(label.graph().name()); + this.schemaTransaction().updateEdgeLabel(label); } @Override @@ -863,6 +881,12 @@ public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) { this.schemaTransaction().addIndexLabel(schemaLabel, indexLabel); } + @Override + public void updateIndexLabel(IndexLabel label) { + assert this.name.equals(label.graph().name()); + this.schemaTransaction().updateIndexLabel(label); + } + @Override public Id removeIndexLabel(Id id) { return this.schemaTransaction().removeIndexLabel(id); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java index 0e7a55328c..8c42b28950 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import com.baidu.hugegraph.HugeGraphParams; import com.baidu.hugegraph.backend.id.Id; @@ -209,6 +210,16 @@ private static Id generateId(HugeType type, String name) { return IdGenerator.of(type.string() + "-" + name); } + @Override + protected void updateSchema(SchemaElement schema, + Consumer updateCallback) { + super.updateSchema(schema, updateCallback); + + this.updateCache(schema); + + this.notifyChanges(Cache.ACTION_INVALIDED, schema.type(), schema.id()); + } + @Override protected void addSchema(SchemaElement schema) { super.addSchema(schema); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java index 7126e4a632..f8ec5fe238 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendTable.java @@ -66,6 +66,32 @@ protected void registerMetaHandlers() { // pass } + public void updateIfPresent(Session session, Entry entry) { + // TODO: use fine-grained row lock + synchronized (this.table) { + assert session == null || !session.hasChanges(); + if (this.queryExist(session, entry)) { + this.insert(session, entry); + if (session != null) { + session.commit(); + } + } + } + } + + public void updateIfAbsent(Session session, Entry entry) { + // TODO: use fine-grained row lock + synchronized (this.table) { + assert session == null || !session.hasChanges(); + if (!this.queryExist(session, entry)) { + this.insert(session, entry); + if (session != null) { + session.commit(); + } + } + } + } + /** * Mapping query-type to table-type * @param query origin query @@ -112,6 +138,8 @@ public static final String joinTableName(String prefix, String table) { public abstract Number queryNumber(Session session, Query query); + public abstract boolean queryExist(Session session, Entry entry); + public abstract void insert(Session session, Entry entry); public abstract void delete(Session session, Entry entry); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java index 5d90fc95b1..ead0553531 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBStore.java @@ -155,6 +155,12 @@ protected void mutate(BackendAction item) { LOG.debug("[store {}] eliminate entry: {}", this.store, entry); table.eliminate(null, entry); break; + case UPDATE_IF_PRESENT: + table.updateIfPresent(null, entry); + break; + case UPDATE_IF_ABSENT: + table.updateIfAbsent(null, entry); + break; default: throw new BackendException("Unsupported mutate type: %s", item.action()); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBTable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBTable.java index 7d5583dffd..2280488589 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBTable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/memory/InMemoryDBTable.java @@ -50,6 +50,7 @@ import com.baidu.hugegraph.util.InsertionOrderUtil; import com.baidu.hugegraph.util.Log; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; public class InMemoryDBTable extends BackendTable { @@ -133,6 +134,12 @@ public void eliminate(BackendSession session, TextBackendEntry entry) { } } + @Override + public boolean queryExist(BackendSession session, TextBackendEntry entry) { + List ids = ImmutableList.of(entry.id()); + return !this.queryById(ids, this.store).isEmpty(); + } + @Override public Number queryNumber(BackendSession session, Query query) { Aggregate aggregate = query.aggregateNotNull(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java index 454ea2af4e..db7d54eaca 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/AbstractTransaction.java @@ -414,6 +414,16 @@ public void doRemove(BackendEntry entry) { this.doAction(Action.DELETE, entry); } + @Watched(prefix = "tx") + public void doUpdateIfPresent(BackendEntry entry) { + this.doAction(Action.UPDATE_IF_PRESENT, entry); + } + + @Watched(prefix = "tx") + public void doUpdateIfAbsent(BackendEntry entry) { + this.doAction(Action.UPDATE_IF_ABSENT, entry); + } + protected void doAction(Action action, BackendEntry entry) { LOG.debug("Transaction {} entry {}", action, entry); E.checkNotNull(entry, "entry"); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java index d0496333ef..77ebba1d8e 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/tx/SchemaTransaction.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.function.Consumer; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; @@ -124,10 +125,15 @@ public List getIndexLabels() { @Watched(prefix = "schema") public Id addPropertyKey(PropertyKey propertyKey) { this.addSchema(propertyKey); - if (propertyKey.olap()) { - return this.createOlapPk(propertyKey); + if (!propertyKey.olap()) { + return IdGenerator.ZERO; } - return IdGenerator.ZERO; + return this.createOlapPk(propertyKey); + } + + @Watched(prefix = "schema") + public void updatePropertyKey(PropertyKey propertyKey) { + this.updateSchema(propertyKey, null); } @Watched(prefix = "schema") @@ -185,6 +191,11 @@ public void addVertexLabel(VertexLabel vertexLabel) { this.addSchema(vertexLabel); } + @Watched(prefix = "schema") + public void updateVertexLabel(VertexLabel vertexLabel) { + this.updateSchema(vertexLabel, null); + } + @Watched(prefix = "schema") public VertexLabel getVertexLabel(Id id) { E.checkArgumentNotNull(id, "Vertex label id can't be null"); @@ -217,6 +228,11 @@ public void addEdgeLabel(EdgeLabel edgeLabel) { this.addSchema(edgeLabel); } + @Watched(prefix = "schema") + public void updateEdgeLabel(EdgeLabel edgeLabel) { + this.updateSchema(edgeLabel, null); + } + @Watched(prefix = "schema") public EdgeLabel getEdgeLabel(Id id) { E.checkArgumentNotNull(id, "Edge label id can't be null"); @@ -239,50 +255,53 @@ public Id removeEdgeLabel(Id id) { } @Watched(prefix = "schema") - public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) { - this.addSchema(indexLabel); - + public void addIndexLabel(SchemaLabel baseLabel, IndexLabel indexLabel) { /* - * Update index name in base-label(VL/EL) + * Create index and update index name in base-label(VL/EL) * TODO: should wrap update base-label and create index in one tx. */ - if (schemaLabel.equals(VertexLabel.OLAP_VL)) { + this.addSchema(indexLabel); + + if (baseLabel.equals(VertexLabel.OLAP_VL)) { return; } - // FIXME: move schemaLabel update into updateSchema() lock block instead - synchronized (schemaLabel) { - schemaLabel.addIndexLabel(indexLabel.id()); - this.updateSchema(schemaLabel); - } + this.updateSchema(baseLabel, schema -> { + // NOTE: Do schema update in the lock block + baseLabel.addIndexLabel(indexLabel.id()); + }); + } + + @Watched(prefix = "schema") + public void updateIndexLabel(IndexLabel indexLabel) { + this.updateSchema(indexLabel, null); } @Watched(prefix = "schema") public void removeIndexLabelFromBaseLabel(IndexLabel indexLabel) { HugeType baseType = indexLabel.baseType(); Id baseValue = indexLabel.baseValue(); - SchemaLabel schemaLabel; + SchemaLabel baseLabel; if (baseType == HugeType.VERTEX_LABEL) { - schemaLabel = this.getVertexLabel(baseValue); + baseLabel = this.getVertexLabel(baseValue); } else { assert baseType == HugeType.EDGE_LABEL; - schemaLabel = this.getEdgeLabel(baseValue); + baseLabel = this.getEdgeLabel(baseValue); } - if (schemaLabel == null) { + if (baseLabel == null) { LOG.info("The base label '{}' of index label '{}' " + "may be deleted before", baseValue, indexLabel); return; } - if (schemaLabel.equals(VertexLabel.OLAP_VL)) { + if (baseLabel.equals(VertexLabel.OLAP_VL)) { return; } - // FIXME: move schemaLabel update into updateSchema() lock block instead - synchronized (schemaLabel) { - schemaLabel.removeIndexLabel(indexLabel.id()); - this.updateSchema(schemaLabel); - } + this.updateSchema(baseLabel, schema -> { + // NOTE: Do schema update in the lock block + baseLabel.removeIndexLabel(indexLabel.id()); + }); } @Watched(prefix = "schema") @@ -368,8 +387,11 @@ public void updateSchemaStatus(SchemaElement schema, SchemaStatus status) { LOG.warn("Can't update schema '{}', it may be deleted", schema); return; } - schema.status(status); - this.updateSchema(schema); + + this.updateSchema(schema, schemaToUpdate -> { + // NOTE: Do schema update in the lock block + schema.status(status); + }); } @Watched(prefix = "schema") @@ -377,28 +399,52 @@ public boolean existsSchemaId(HugeType type, Id id) { return this.getSchema(type, id) != null; } - protected void updateSchema(SchemaElement schema) { - this.addSchema(schema); + protected void updateSchema(SchemaElement schema, + Consumer updateCallback) { + LOG.debug("SchemaTransaction update {} with id '{}'", + schema.type(), schema.id()); + this.saveSchema(schema, true, updateCallback); } protected void addSchema(SchemaElement schema) { LOG.debug("SchemaTransaction add {} with id '{}'", schema.type(), schema.id()); setCreateTimeIfNeeded(schema); + this.saveSchema(schema, false, null); + } - // System schema just put into SystemSchemaStore in memory - if (schema.longId() < 0L) { - this.systemSchemaStore.add(schema); - return; - } - + private void saveSchema(SchemaElement schema, boolean update, + Consumer updateCallback) { + // Lock for schema update LockUtil.Locks locks = new LockUtil.Locks(this.params().name()); try { - locks.lockWrites(LockUtil.hugeType2Group(schema.type()), - schema.id()); + locks.lockWrites(LockUtil.hugeType2Group(schema.type()), schema.id()); + + if (updateCallback != null) { + // NOTE: Do schema update in the lock block + updateCallback.accept(schema); + } + + // System schema just put into SystemSchemaStore in memory + if (schema.longId() < 0L) { + this.systemSchemaStore.add(schema); + return; + } + + BackendEntry entry = this.serialize(schema); + this.beforeWrite(); - this.doInsert(this.serialize(schema)); - this.indexTx.updateNameIndex(schema, false); + + if (update) { + this.doUpdateIfPresent(entry); + // TODO: also support updateIfPresent for index-update + this.indexTx.updateNameIndex(schema, false); + } else { + // TODO: support updateIfAbsentProperty (property: label name) + this.doUpdateIfAbsent(entry); + this.indexTx.updateNameIndex(schema, false); + } + this.afterWrite(); } finally { locks.unlock(); @@ -459,6 +505,11 @@ protected List getAllSchema(HugeType type) { List results = new ArrayList<>(); Query query = new Query(type); Iterator entries = this.query(query).iterator(); + /* + * Can use MapperIterator instead if don't need to debug: + * new MapperIterator<>(entries, entry -> this.deserialize(entry, type)) + * QueryResults.fillList(iter, results); + */ try { while (entries.hasNext()) { BackendEntry entry = entries.next(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/EdgeLabelBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/EdgeLabelBuilder.java index 338b45fb2e..13b84d2be6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/EdgeLabelBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/EdgeLabelBuilder.java @@ -262,7 +262,7 @@ public EdgeLabel append() { edgeLabel.nullableKey(propertyKey.id()); } edgeLabel.userdata(this.userdata); - this.graph().addEdgeLabel(edgeLabel); + this.graph().updateEdgeLabel(edgeLabel); return edgeLabel; } @@ -280,7 +280,7 @@ public EdgeLabel eliminate() { Userdata.check(this.userdata, Action.ELIMINATE); edgeLabel.removeUserdata(this.userdata); - this.graph().addEdgeLabel(edgeLabel); + this.graph().updateEdgeLabel(edgeLabel); return edgeLabel; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java index 44b21f4bec..ab00d7f81b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/IndexLabelBuilder.java @@ -287,8 +287,7 @@ public IndexLabel append() { this.checkStableVars(); Userdata.check(this.userdata, Action.APPEND); indexLabel.userdata(this.userdata); - SchemaLabel schemaLabel = indexLabel.baseLabel(); - this.graph().addIndexLabel(schemaLabel, indexLabel); + this.graph().updateIndexLabel(indexLabel); return indexLabel; } @@ -303,8 +302,7 @@ public IndexLabel eliminate() { Userdata.check(this.userdata, Action.ELIMINATE); indexLabel.removeUserdata(this.userdata); - SchemaLabel schemaLabel = indexLabel.baseLabel(); - this.graph().addIndexLabel(schemaLabel, indexLabel); + this.graph().updateIndexLabel(indexLabel); return indexLabel; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java index 965b5b6fed..92941cfcc1 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/PropertyKeyBuilder.java @@ -194,7 +194,7 @@ public PropertyKey append() { Userdata.check(this.userdata, Action.APPEND); propertyKey.userdata(this.userdata); - this.graph().addPropertyKey(propertyKey); + this.graph().updatePropertyKey(propertyKey); return propertyKey; } @@ -209,7 +209,7 @@ public PropertyKey eliminate() { Userdata.check(this.userdata, Action.ELIMINATE); propertyKey.removeUserdata(this.userdata); - this.graph().addPropertyKey(propertyKey); + this.graph().updatePropertyKey(propertyKey); return propertyKey; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/VertexLabelBuilder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/VertexLabelBuilder.java index 75fb3285c1..533245d717 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/VertexLabelBuilder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/schema/builder/VertexLabelBuilder.java @@ -228,7 +228,7 @@ public VertexLabel append() { vertexLabel.nullableKey(propertyKey.id()); } vertexLabel.userdata(this.userdata); - this.graph().addVertexLabel(vertexLabel); + this.graph().updateVertexLabel(vertexLabel); return vertexLabel; } @@ -246,7 +246,7 @@ public VertexLabel eliminate() { Userdata.check(this.userdata, Action.ELIMINATE); vertexLabel.removeUserdata(this.userdata); - this.graph().addVertexLabel(vertexLabel); + this.graph().updateVertexLabel(vertexLabel); return vertexLabel; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/Action.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/Action.java index 8411e929ea..255627d53c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/Action.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/type/define/Action.java @@ -27,7 +27,11 @@ public enum Action implements SerialEnum { ELIMINATE(3, "eliminate"), - DELETE(4, "delete"); + DELETE(4, "delete"), + + UPDATE_IF_PRESENT(5, "update_if_present"), + + UPDATE_IF_ABSENT(6, "update_if_absent"); private final byte code; private final String name; @@ -61,6 +65,10 @@ public static Action fromCode(byte code) { return ELIMINATE; case 4: return DELETE; + case 5: + return UPDATE_IF_PRESENT; + case 6: + return UPDATE_IF_ABSENT; default: throw new AssertionError("Unsupported action code: " + code); } diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java index 628048a5d7..9cd5c233a4 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseStore.java @@ -215,6 +215,12 @@ private void mutate(Session session, BackendAction item) { case ELIMINATE: table.eliminate(session, entry); break; + case UPDATE_IF_PRESENT: + table.updateIfPresent(session, entry); + break; + case UPDATE_IF_ABSENT: + table.updateIfAbsent(session, entry); + break; default: throw new AssertionError(String.format( "Unsupported mutate action: %s", item.action())); diff --git a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java index 3cca9a5dcc..b4009dd9c5 100644 --- a/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java +++ b/hugegraph-hbase/src/main/java/com/baidu/hugegraph/backend/store/hbase/HbaseTable.java @@ -144,6 +144,14 @@ public void eliminate(Session session, BackendEntry entry) { this.delete(session, entry); } + @Override + public boolean queryExist(Session session, BackendEntry entry) { + Id id = entry.id(); + try (RowIterator iter = this.queryById(session, id)) { + return iter.hasNext(); + } + } + @Override public Number queryNumber(Session session, Query query) { Aggregate aggregate = query.aggregateNotNull(); diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java index 858b22edb5..90318c8db2 100644 --- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java +++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlStore.java @@ -260,6 +260,12 @@ private void mutate(Session session, BackendAction item) { case ELIMINATE: table.eliminate(session, entry.row()); break; + case UPDATE_IF_PRESENT: + table.updateIfPresent(session, entry.row()); + break; + case UPDATE_IF_ABSENT: + table.updateIfAbsent(session, entry.row()); + break; default: throw new AssertionError(String.format( "Unsupported mutate action: %s", item.action())); diff --git a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java index f872b6f56a..69efd96bda 100644 --- a/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java +++ b/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlTable.java @@ -39,6 +39,7 @@ import com.baidu.hugegraph.backend.query.Aggregate; import com.baidu.hugegraph.backend.query.Condition; import com.baidu.hugegraph.backend.query.ConditionQuery; +import com.baidu.hugegraph.backend.query.IdQuery; import com.baidu.hugegraph.backend.query.Query; import com.baidu.hugegraph.backend.store.BackendEntry; import com.baidu.hugegraph.backend.store.BackendTable; @@ -48,6 +49,8 @@ import com.baidu.hugegraph.backend.store.mysql.MysqlSessions.Session; import com.baidu.hugegraph.exception.NotFoundException; import com.baidu.hugegraph.iterator.ExtendableIterator; +import com.baidu.hugegraph.iterator.WrappedIterator; +import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.HugeKeys; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.Log; @@ -60,10 +63,12 @@ public abstract class MysqlTable private static final String DECIMAL = "DECIMAL"; - // The template for insert and delete statements + // The template cache for insert and delete statements private String insertTemplate; private String insertTemplateTtl; private String deleteTemplate; + private String updateIfPresentTemplate; + private String updateIfAbsentTemplate; private final MysqlShardSplitter shardSplitter; @@ -72,6 +77,9 @@ public MysqlTable(String table) { this.insertTemplate = null; this.insertTemplateTtl = null; this.deleteTemplate = null; + this.updateIfPresentTemplate = null; + this.updateIfAbsentTemplate = null; + this.shardSplitter = new MysqlShardSplitter(this.table()); } @@ -175,31 +183,56 @@ protected List idColumnValue(Id id) { return ImmutableList.of(id.asObject()); } - protected String buildInsertTemplate(MysqlBackendEntry.Row entry) { - if (entry.ttl() != 0L) { - return this.buildInsertTemplateWithTtl(entry); - } - if (this.insertTemplate != null) { - return this.insertTemplate; + protected void insertOrUpdate(Session session, String template, + List params) { + PreparedStatement insertStmt; + try { + // Create or get insert prepare statement + insertStmt = session.prepareStatement(template); + int i = 1; + for (Object param : params) { + insertStmt.setObject(i++, param); + } + } catch (SQLException e) { + throw new BackendException("Failed to prepare statement '%s' " + + "with params: %s", template, params); } - - this.insertTemplate = this.buildInsertTemplateForce(entry); - return this.insertTemplate; + session.add(insertStmt); } - protected String buildInsertTemplateWithTtl(MysqlBackendEntry.Row entry) { - assert entry.ttl() != 0L; - if (this.insertTemplateTtl != null) { + protected final String buildUpdateTemplate(MysqlBackendEntry.Row entry) { + if (entry.ttl() != 0L) { + if (this.insertTemplateTtl != null) { + return this.insertTemplateTtl; + } + + this.insertTemplateTtl = this.buildUpdateForcedTemplate(entry); return this.insertTemplateTtl; + } else { + if (this.insertTemplate != null) { + return this.insertTemplate; + } + + this.insertTemplate = this.buildUpdateForcedTemplate(entry); + return this.insertTemplate; } + } - this.insertTemplateTtl = this.buildInsertTemplateForce(entry); - return this.insertTemplateTtl; + protected String buildUpdateForcedTemplate(MysqlBackendEntry.Row entry) { + StringBuilder insert = new StringBuilder(); + insert.append("REPLACE INTO ").append(this.table()); + return this.buildInsertKeys(insert, entry); } - protected String buildInsertTemplateForce(MysqlBackendEntry.Row entry) { + protected String buildUpdateIfAbsentTemplate(MysqlBackendEntry.Row entry) { StringBuilder insert = new StringBuilder(); - insert.append("REPLACE INTO ").append(this.table()).append(" ("); + insert.append("INSERT IGNORE INTO ").append(this.table()); + return this.buildInsertKeys(insert, entry); + } + + protected String buildInsertKeys(StringBuilder insert, + MysqlBackendEntry.Row entry) { + insert.append(" ("); int i = 0; int n = entry.columns().size(); @@ -210,7 +243,7 @@ protected String buildInsertTemplateForce(MysqlBackendEntry.Row entry) { } } insert.append(") VALUES ("); - // Fill with '?' + // Fill with '?' as a placeholder for (i = 0; i < n; i++) { insert.append("?"); if (i != n - 1) { @@ -222,11 +255,76 @@ protected String buildInsertTemplateForce(MysqlBackendEntry.Row entry) { return insert.toString(); } - protected String buildDeleteTemplate(List idNames) { - if (this.deleteTemplate != null) { - return this.deleteTemplate; + protected List buildUpdateForcedParams(MysqlBackendEntry.Row entry) { + return this.buildColumnsParams(entry); + } + + protected List buildUpdateIfAbsentParams(MysqlBackendEntry.Row entry) { + return this.buildColumnsParams(entry); + } + + protected List buildColumnsParams(MysqlBackendEntry.Row entry) { + return this.buildColumnsParams(entry, null); + } + + protected List buildColumnsParams(MysqlBackendEntry.Row entry, + List skipKeys) { + List objects = new ArrayList<>(); + for (Map.Entry e : entry.columns().entrySet()) { + HugeKeys key = e.getKey(); + Object value = e.getValue(); + if (skipKeys != null && skipKeys.contains(key)) { + continue; + } + String type = this.tableDefine().columns().get(key); + if (type.startsWith(DECIMAL)) { + value = new BigDecimal(value.toString()); + } + objects.add(value); } + return objects; + } + + protected String buildUpdateIfPresentTemplate(MysqlBackendEntry.Row entry) { + + StringBuilder update = new StringBuilder(); + update.append("UPDATE ").append(this.table()); + update.append(" SET "); + + List idNames = this.idColumnName(); + int i = 0; + int size = entry.columns().size(); + for (HugeKeys key : entry.columns().keySet()) { + if (idNames.contains(key)) { + size--; + continue; + } + update.append(formatKey(key)); + update.append("=?"); + if (++i != size) { + update.append(", "); + } + } + + WhereBuilder where = this.newWhereBuilder(); + where.and(formatKeys(idNames), "="); + update.append(where.build()); + + return update.toString(); + } + + protected List buildUpdateIfPresentParams(MysqlBackendEntry.Row entry) { + List idNames = this.idColumnName(); + List params = this.buildColumnsParams(entry, idNames); + + List idValues = this.idColumnValue(entry); + params.addAll(idValues); + + return params; + } + + protected String buildDeleteTemplate(List idNames) { StringBuilder delete = new StringBuilder(); delete.append("DELETE FROM ").append(this.table()); this.appendPartition(delete); @@ -235,8 +333,7 @@ protected String buildDeleteTemplate(List idNames) { where.and(formatKeys(idNames), "="); delete.append(where.build()); - this.deleteTemplate = delete.toString(); - return this.deleteTemplate; + return delete.toString(); } protected String buildDropTemplate() { @@ -256,40 +353,21 @@ protected void appendPartition(StringBuilder sb) { */ @Override public void insert(Session session, MysqlBackendEntry.Row entry) { - String template = this.buildInsertTemplate(entry); - - PreparedStatement insertStmt; - try { - // Create or get insert prepare statement - insertStmt = session.prepareStatement(template); - int i = 1; - for (Object object : this.buildInsertObjects(entry)) { - insertStmt.setObject(i++, object); - } - } catch (SQLException e) { - throw new BackendException("Failed to prepare statement '%s'" + - "for entry: %s", template, entry); - } - session.add(insertStmt); - } - - protected List buildInsertObjects(MysqlBackendEntry.Row entry) { - List objects = new ArrayList<>(); - for (Map.Entry e : entry.columns().entrySet()) { - Object value = e.getValue(); - String type = this.tableDefine().columns().get(e.getKey()); - if (type.startsWith(DECIMAL)) { - value = new BigDecimal(value.toString()); - } - objects.add(value); - } - return objects; + String template = this.buildUpdateTemplate(entry); + List params = this.buildUpdateForcedParams(entry); + this.insertOrUpdate(session, template, params); } @Override public void delete(Session session, MysqlBackendEntry.Row entry) { List idNames = this.idColumnName(); - String template = this.buildDeleteTemplate(idNames); + + String template = this.deleteTemplate; + if (template == null) { + template = this.buildDeleteTemplate(idNames); + this.deleteTemplate = template; + } + PreparedStatement deleteStmt; try { deleteStmt = session.prepareStatement(template); @@ -328,6 +406,39 @@ public void eliminate(Session session, MysqlBackendEntry.Row entry) { this.delete(session, entry); } + @Override + public void updateIfPresent(Session session, MysqlBackendEntry.Row entry) { + String template = this.updateIfPresentTemplate; + if (template == null) { + template = this.buildUpdateIfPresentTemplate(entry); + this.updateIfPresentTemplate = template; + } + List params = this.buildUpdateIfPresentParams(entry); + this.insertOrUpdate(session, template, params); + } + + @Override + public void updateIfAbsent(Session session, MysqlBackendEntry.Row entry) { + String template = this.updateIfAbsentTemplate; + if (template == null) { + template = this.buildUpdateIfAbsentTemplate(entry); + this.updateIfAbsentTemplate = template; + } + List params = this.buildUpdateIfAbsentParams(entry); + this.insertOrUpdate(session, template, params); + } + + @Override + public boolean queryExist(Session session, MysqlBackendEntry.Row entry) { + Query query = new IdQuery.OneIdQuery(HugeType.UNKNOWN, entry.id()); + Iterator iter = this.query(session, query); + try { + return iter.hasNext(); + } finally { + WrappedIterator.close(iter); + } + } + @Override public Number queryNumber(Session session, Query query) { Aggregate aggregate = query.aggregateNotNull(); @@ -353,8 +464,8 @@ public Iterator query(Session session, Query query) { } protected Iterator query(Session session, Query query, - BiFunction> - parser) { + BiFunction> parser) { ExtendableIterator rs = new ExtendableIterator<>(); if (query.limit() == 0L && !query.noLimit()) { diff --git a/hugegraph-postgresql/src/main/java/com/baidu/hugegraph/backend/store/postgresql/PostgresqlTable.java b/hugegraph-postgresql/src/main/java/com/baidu/hugegraph/backend/store/postgresql/PostgresqlTable.java index f811274c48..e3b0bea0be 100644 --- a/hugegraph-postgresql/src/main/java/com/baidu/hugegraph/backend/store/postgresql/PostgresqlTable.java +++ b/hugegraph-postgresql/src/main/java/com/baidu/hugegraph/backend/store/postgresql/PostgresqlTable.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Strings; +import com.baidu.hugegraph.backend.serializer.TableBackendEntry.Row; import com.baidu.hugegraph.backend.store.mysql.MysqlBackendEntry; import com.baidu.hugegraph.backend.store.mysql.MysqlSessions.Session; import com.baidu.hugegraph.backend.store.mysql.MysqlTable; @@ -54,15 +55,31 @@ protected String engine(Session session) { } @Override - protected List buildInsertObjects(MysqlBackendEntry.Row entry) { - List objects = new ArrayList<>(); - objects.addAll(super.buildInsertObjects(entry)); - objects.addAll(super.buildInsertObjects(entry)); - return objects; + protected String buildUpdateForcedTemplate(MysqlBackendEntry.Row entry) { + return this.buildInsertKeys(entry, false); } @Override - protected String buildInsertTemplateForce(MysqlBackendEntry.Row entry) { + protected List buildUpdateForcedParams(MysqlBackendEntry.Row entry) { + List params = new ArrayList<>(); + List allColumns = this.buildColumnsParams(entry); + params.addAll(allColumns); + params.addAll(allColumns); + return params; + } + + @Override + protected String buildUpdateIfAbsentTemplate(Row entry) { + return this.buildInsertKeys(entry, true); + } + + @Override + protected List buildUpdateIfAbsentParams(MysqlBackendEntry.Row entry) { + return this.buildColumnsParams(entry); + } + + protected String buildInsertKeys(MysqlBackendEntry.Row entry, + boolean ignoreConflicts) { StringBuilder insert = new StringBuilder(); insert.append("INSERT INTO ").append(this.table()).append(" ("); @@ -95,13 +112,17 @@ protected String buildInsertTemplateForce(MysqlBackendEntry.Row entry) { } insert.append(")"); - i = 0; - size = entry.columns().keySet().size(); - insert.append(" DO UPDATE SET "); - for (HugeKeys key : entry.columns().keySet()) { - insert.append(formatKey(key)).append(" = ?"); - if (++i != size) { - insert.append(", "); + if (ignoreConflicts) { + insert.append(" DO NOTHING"); + } else { + i = 0; + size = entry.columns().keySet().size(); + insert.append(" DO UPDATE SET "); + for (HugeKeys key : entry.columns().keySet()) { + insert.append(formatKey(key)).append(" = ?"); + if (++i != size) { + insert.append(", "); + } } } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index cd14d3e6da..56230ac931 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -473,6 +473,12 @@ private void mutate(Session session, BackendAction item) { case ELIMINATE: table.eliminate(session, entry); break; + case UPDATE_IF_PRESENT: + table.updateIfPresent(session, entry); + break; + case UPDATE_IF_ABSENT: + table.updateIfAbsent(session, entry); + break; default: throw new AssertionError(String.format( "Unsupported mutate action: %s", item.action())); diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java index 427161bfff..48c9a1af1a 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java @@ -121,6 +121,14 @@ public void eliminate(Session session, BackendEntry entry) { this.delete(session, entry); } + @Override + public boolean queryExist(Session session, BackendEntry entry) { + Id id = entry.id(); + try (BackendColumnIterator iter = this.queryById(session, id)) { + return iter.hasNext(); + } + } + @Override public Number queryNumber(Session session, Query query) { Aggregate aggregate = query.aggregateNotNull();