Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.baidu.hugegraph.backend.serializer.TableBackendEntry;
import com.baidu.hugegraph.backend.serializer.TableSerializer;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.schema.PropertyKey;
import com.baidu.hugegraph.schema.SchemaElement;
import com.baidu.hugegraph.structure.HugeElement;
Expand All @@ -51,6 +52,10 @@

public class CassandraSerializer extends TableSerializer {

public CassandraSerializer(HugeConfig config) {
super(config);
}

@Override
public CassandraBackendEntry newBackendEntry(HugeType type, Id id) {
return new CassandraBackendEntry(type, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.cassandra.CassandraStore.CassandraGraphStore;
import com.baidu.hugegraph.backend.store.cassandra.CassandraStore.CassandraSchemaStore;
import com.baidu.hugegraph.config.HugeConfig;

public class CassandraStoreProvider extends AbstractBackendStoreProvider {

Expand All @@ -31,12 +32,12 @@ protected String keyspace() {
}

@Override
protected BackendStore newSchemaStore(String store) {
protected BackendStore newSchemaStore(HugeConfig config, String store) {
return new CassandraSchemaStore(this, this.keyspace(), store);
}

@Override
protected BackendStore newGraphStore(String store) {
protected BackendStore newGraphStore(HugeConfig config, String store) {
return new CassandraGraphStore(this, this.keyspace(), store);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,17 +447,17 @@ private void checkGraphNotClosed() {

private BackendStore loadSchemaStore() {
String name = this.configuration.get(CoreOptions.STORE_SCHEMA);
return this.storeProvider.loadSchemaStore(name);
return this.storeProvider.loadSchemaStore(this.configuration, name);
}

private BackendStore loadGraphStore() {
String name = this.configuration.get(CoreOptions.STORE_GRAPH);
return this.storeProvider.loadGraphStore(name);
return this.storeProvider.loadGraphStore(this.configuration, name);
}

private BackendStore loadSystemStore() {
String name = this.configuration.get(CoreOptions.STORE_SYSTEM);
return this.storeProvider.loadSystemStore(name);
return this.storeProvider.loadSystemStore(this.configuration, name);
}

@Watched
Expand Down Expand Up @@ -498,7 +498,7 @@ private BackendStoreProvider loadStoreProvider() {
private AbstractSerializer serializer() {
String name = this.configuration.get(CoreOptions.SERIALIZER);
LOG.debug("Loading serializer '{}' for graph '{}'", name, this.name);
AbstractSerializer serializer = SerializerFactory.serializer(name);
AbstractSerializer serializer = SerializerFactory.serializer(this.configuration, name);
if (serializer == null) {
throw new HugeException("Can't load serializer with name " + name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@
import com.baidu.hugegraph.backend.query.IdQuery;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.type.HugeType;

public abstract class AbstractSerializer
implements GraphSerializer, SchemaSerializer {

public AbstractSerializer() {
// TODO: default constructor
}

public AbstractSerializer(HugeConfig config) {
// TODO: use the config
}

protected BackendEntry convertEntry(BackendEntry entry) {
return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ public class BinaryBackendEntry implements BackendEntry {
private boolean olap;

public BinaryBackendEntry(HugeType type, byte[] bytes) {
this(type, BytesBuffer.wrap(bytes).parseId(type));
this(type, BytesBuffer.wrap(bytes).parseId(type, false));
}

public BinaryBackendEntry(HugeType type, byte[] bytes, boolean enablePartition) {
this(type, BytesBuffer.wrap(bytes).parseId(type, enablePartition));
}

public BinaryBackendEntry(HugeType type, BinaryId id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.baidu.hugegraph.backend.id.IdGenerator;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendEntry.BackendColumn;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.schema.VertexLabel;
import com.baidu.hugegraph.structure.HugeProperty;
import com.baidu.hugegraph.structure.HugeVertex;
Expand All @@ -32,8 +33,8 @@

public class BinaryScatterSerializer extends BinarySerializer {

public BinaryScatterSerializer() {
super(true, true);
public BinaryScatterSerializer(HugeConfig config) {
super(true, true, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

package com.baidu.hugegraph.backend.serializer;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;

import com.baidu.hugegraph.config.HugeConfig;
import org.apache.commons.lang.NotImplementedException;

import com.baidu.hugegraph.HugeGraph;
Expand Down Expand Up @@ -81,29 +79,54 @@ public class BinarySerializer extends AbstractSerializer {
*/
private final boolean keyWithIdPrefix;
private final boolean indexWithIdPrefix;
private final boolean enablePartition;

public BinarySerializer() {
this(true, true);
this(true, true, false);
}

public BinarySerializer(HugeConfig config) {
this(true, true, false);
}

public BinarySerializer(boolean keyWithIdPrefix,
boolean indexWithIdPrefix) {
boolean indexWithIdPrefix,
boolean enablePartition) {
this.keyWithIdPrefix = keyWithIdPrefix;
this.indexWithIdPrefix = indexWithIdPrefix;
this.enablePartition = enablePartition;
}

@Override
protected BinaryBackendEntry newBackendEntry(HugeType type, Id id) {
if (type.isVertex()) {
BytesBuffer buffer = BytesBuffer.allocate(2 + 1 + id.length());
writePartitionedId(HugeType.VERTEX, id, buffer);
return new BinaryBackendEntry(type, new BinaryId(buffer.bytes(), id));
}

if (type.isEdge()) {
E.checkState(id instanceof BinaryId,
"Expect a BinaryId for BackendEntry with edge id");
return new BinaryBackendEntry(type, (BinaryId) id);
}

if (type.isIndex()) {
if (this.enablePartition) {
if (type.isStringIndex()) {
// TODO: add string index partition
}
if (type.isNumericIndex()) {
// TODO: add numeric index partition
}
}
BytesBuffer buffer = BytesBuffer.allocate(1 + id.length());
byte[] idBytes = buffer.writeIndexId(id, type).bytes();
return new BinaryBackendEntry(type, new BinaryId(idBytes, id));
}

BytesBuffer buffer = BytesBuffer.allocate(1 + id.length());
byte[] idBytes = type.isIndex() ?
buffer.writeIndexId(id, type).bytes() :
buffer.writeId(id).bytes();
byte[] idBytes = buffer.writeId(id).bytes();
return new BinaryBackendEntry(type, new BinaryId(idBytes, id));
}

Expand All @@ -112,8 +135,7 @@ protected final BinaryBackendEntry newBackendEntry(HugeVertex vertex) {
}

protected final BinaryBackendEntry newBackendEntry(HugeEdge edge) {
BinaryId id = new BinaryId(formatEdgeName(edge),
edge.idWithDirection());
BinaryId id = writeEdgeId(edge.idWithDirection());
return newBackendEntry(edge.type(), id);
}

Expand Down Expand Up @@ -224,12 +246,6 @@ protected void parseExpiredTime(BytesBuffer buffer, HugeElement element) {
element.expiredTime(buffer.readVLong());
}

protected byte[] formatEdgeName(HugeEdge edge) {
// owner-vertex + dir + edge-label + sort-values + other-vertex
return BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID)
.writeEdgeId(edge.id()).bytes();
}

protected byte[] formatEdgeValue(HugeEdge edge) {
int propsCount = edge.sizeOfProperties();
BytesBuffer buffer = BytesBuffer.allocate(4 + 16 * propsCount);
Expand Down Expand Up @@ -477,7 +493,8 @@ protected void parseVertexOlap(byte[] value, HugeVertex vertex) {
public BackendEntry writeEdge(HugeEdge edge) {
BinaryBackendEntry entry = newBackendEntry(edge);
byte[] name = this.keyWithIdPrefix ?
this.formatEdgeName(edge) : EMPTY_BYTES;
entry.id().asBytes() : EMPTY_BYTES;

byte[] value = this.formatEdgeValue(edge);
entry.column(name, value);

Expand Down Expand Up @@ -571,6 +588,10 @@ public BackendEntry writeId(HugeType type, Id id) {
protected Id writeQueryId(HugeType type, Id id) {
if (type.isEdge()) {
id = writeEdgeId(id);
} else if (type.isVertex()) {
BytesBuffer buffer = BytesBuffer.allocate(2 + 1 + id.length());
writePartitionedId(HugeType.VERTEX, id, buffer);
id = new BinaryId(buffer.bytes(), id);
} else {
BytesBuffer buffer = BytesBuffer.allocate(1 + id.length());
id = new BinaryId(buffer.writeId(id).bytes(), id);
Expand Down Expand Up @@ -600,13 +621,12 @@ private Query writeQueryEdgeRangeCondition(ConditionQuery cq) {
}
Id label = cq.condition(HugeKeys.LABEL);

int size = 1 + vertex.length() + 1 + label.length() + 16;
BytesBuffer start = BytesBuffer.allocate(size);
start.writeId(vertex);
BytesBuffer start = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
writePartitionedId(HugeType.EDGE, vertex, start);
Copy link
Copy Markdown
Contributor Author

@JackyYangPassion JackyYangPassion Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for com.baidu.hugegraph.core.EdgeCoreTest test error

writeQueryEdgeRangeCondition() add writePartitionedId()

start.write(direction.type().code());
start.writeId(label);

BytesBuffer end = BytesBuffer.allocate(size);
BytesBuffer end = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
end.copyFrom(start);

RangeConditions range = new RangeConditions(sortValues);
Expand Down Expand Up @@ -655,7 +675,7 @@ private Query writeQueryEdgePrefixCondition(ConditionQuery cq) {

if (key == HugeKeys.OWNER_VERTEX ||
key == HugeKeys.OTHER_VERTEX) {
buffer.writeId((Id) value);
writePartitionedId(HugeType.EDGE, (Id) value, buffer);
} else if (key == HugeKeys.DIRECTION) {
byte t = ((Directions) value).type().code();
buffer.write(t);
Expand Down Expand Up @@ -800,18 +820,58 @@ private BinaryBackendEntry formatILDeletion(HugeIndex index) {
return entry;
}

private static BinaryId writeEdgeId(Id id) {
private BinaryId writeEdgeId(Id id) {
EdgeId edgeId;
if (id instanceof EdgeId) {
edgeId = (EdgeId) id;
} else {
edgeId = EdgeId.parse(id.asString());
}
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID)
.writeEdgeId(edgeId);
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
if (this.enablePartition) {
buffer.writeShort(getPartition(HugeType.EDGE, edgeId.ownerVertexId()));
buffer.writeEdgeId(edgeId);
} else {
buffer.writeEdgeId(edgeId);
}
return new BinaryId(buffer.bytes(), id);
}

private void writePartitionedId(HugeType type, Id id, BytesBuffer buffer) {
if (this.enablePartition) {
buffer.writeShort(getPartition(type, id));
buffer.writeId(id);
} else {
buffer.writeId(id);
}
}

protected short getPartition(HugeType type, Id id) {
return 0;
}

public BackendEntry parse(BackendEntry originEntry) {
Copy link
Copy Markdown
Contributor Author

@JackyYangPassion JackyYangPassion Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for serializer.BinarySerializerTest.testEdgeForPartition test error
add parse(BackendEntry originEntry) for deserialize edge BinanryBackendEntry

byte[] bytes = originEntry.id().asBytes();
BinaryBackendEntry parsedEntry = new BinaryBackendEntry(originEntry.type(),
bytes,
this.enablePartition);
if (this.enablePartition) {
bytes = Arrays.copyOfRange(bytes, parsedEntry.id().length() + 2, bytes.length);
} else {
bytes = Arrays.copyOfRange(bytes, parsedEntry.id().length(), bytes.length);
}
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
buffer.write(parsedEntry.id().asBytes());
buffer.write(bytes);
parsedEntry = new BinaryBackendEntry(originEntry.type(),
new BinaryId(buffer.bytes(),
BytesBuffer.wrap(buffer.bytes()).readEdgeId()));
for (BackendEntry.BackendColumn col : originEntry.columns()) {
parsedEntry.column(buffer.bytes(), col.value);
}
return parsedEntry;
}

private static Query prefixQuery(ConditionQuery query, Id prefix) {
Query newQuery;
if (query.paging() && !query.page().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ public Id readId(boolean big) {
}

public BytesBuffer writeEdgeId(Id id) {
// owner-vertex + dir + edge-label + sort-values + other-vertex
EdgeId edge = (EdgeId) id;
this.writeId(edge.ownerVertexId());
this.write(edge.directionCode());
Expand Down Expand Up @@ -767,11 +768,14 @@ public BinaryId asId() {
return new BinaryId(this.bytes(), null);
}

public BinaryId parseId(HugeType type) {
public BinaryId parseId(HugeType type, boolean enablePartition) {
if (type.isIndex()) {
return this.readIndexId(type);
}
// Parse id from bytes
if ((type.isVertex() || type.isEdge()) && enablePartition) {
this.readShort();
}
int start = this.buffer.position();
/*
* Since edge id in edges table doesn't prefix with leading 0x7e,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package com.baidu.hugegraph.backend.serializer;

import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.type.HugeType;

public class SerializerFactory {

Expand All @@ -32,15 +35,15 @@ public class SerializerFactory {
serializers = new ConcurrentHashMap<>();
}

public static AbstractSerializer serializer(String name) {
public static AbstractSerializer serializer(HugeConfig config, String name) {
name = name.toLowerCase();
switch (name) {
case "binary":
return new BinarySerializer();
return new BinarySerializer(config);
case "binaryscatter":
return new BinaryScatterSerializer();
return new BinaryScatterSerializer(config);
case "text":
return new TextSerializer();
return new TextSerializer(config);
default:
}

Expand All @@ -51,7 +54,7 @@ public static AbstractSerializer serializer(String name) {

assert AbstractSerializer.class.isAssignableFrom(clazz);
try {
return clazz.getConstructor().newInstance();
return clazz.getConstructor(HugeConfig.class).newInstance(config);
} catch (Exception e) {
throw new BackendException(e);
}
Expand Down
Loading