Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion hugegraph-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
<artifactId>hugegraph-common</artifactId>
<version>${hugegraph.common.version}</version>
</dependency>

<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class BatchExample {

public static void main(String[] args) {
// If connect failed will throw a exception.
// If connect failed will throw an exception.
HugeClient hugeClient = HugeClient.builder("http://localhost:8080",
"hugegraph").build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.serializer.direct;

import com.baidu.hugegraph.driver.HugeClient;
import com.baidu.hugegraph.serializer.direct.struct.HugeType;
import com.baidu.hugegraph.serializer.direct.util.BytesBuffer;
import com.baidu.hugegraph.serializer.direct.util.GraphSchema;
import com.baidu.hugegraph.serializer.direct.util.Id;
import com.baidu.hugegraph.serializer.direct.util.IdGenerator;
import com.baidu.hugegraph.structure.GraphElement;
import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.structure.schema.PropertyKey;

import java.util.Arrays;
import java.util.Map;

/**
* TODO: review later
*/
public class HBaseSerializer {

private int edgeLogicPartitions = 30;
private int vertexLogicPartitions = 10;
private HugeClient client;
private GraphSchema graphSchema;


public HBaseSerializer(HugeClient client, int vertexPartitions, int edgePartitions){
this.client = client;
this.graphSchema = new GraphSchema(client);
this.edgeLogicPartitions = edgePartitions;
this.vertexLogicPartitions = vertexPartitions;
}

public byte[] getKeyBytes(GraphElement e) {
byte[] array = null;
if(e.type() == "vertex" && e.id() != null){
BytesBuffer buffer = BytesBuffer.allocate(2 + 1 + e.id().toString().length());
buffer.writeShort(getPartition(HugeType.VERTEX, IdGenerator.of(e.id())));
buffer.writeId(IdGenerator.of(e.id()));
array = buffer.bytes();
}else if ( e.type() == "edge" ){
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
Edge edge = (Edge)e;
buffer.writeShort(getPartition(HugeType.EDGE, IdGenerator.of(edge.sourceId())));
buffer.writeId(IdGenerator.of(edge.sourceId()));
buffer.write(HugeType.EDGE_OUT.code());
buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id())); //出现错误
buffer.writeStringWithEnding("");
buffer.writeId(IdGenerator.of(edge.targetId()));
array = buffer.bytes();
}
return array;
}

public byte[] getValueBytes(GraphElement e) {
byte[] array = null;
if(e.type() == "vertex"){
int propsCount = e.properties().size() ; //vertex.sizeOfProperties();
BytesBuffer buffer = BytesBuffer.allocate(8 + 16 * propsCount);
buffer.writeId(IdGenerator.of(graphSchema.getVertexLabel(e.label()).id()));
buffer.writeVInt(propsCount);
for(Map.Entry<String, Object> entry : e.properties().entrySet()){
PropertyKey propertyKey = graphSchema.getPropertyKey(entry.getKey());
buffer.writeVInt(propertyKey.id().intValue());
buffer.writeProperty(propertyKey.dataType(),entry.getValue());
}
array = buffer.bytes();
} else if ( e.type() == "edge" ){
int propsCount = e.properties().size();
BytesBuffer buffer = BytesBuffer.allocate(4 + 16 * propsCount);
buffer.writeVInt(propsCount);
for(Map.Entry<String, Object> entry : e.properties().entrySet()){
PropertyKey propertyKey = graphSchema.getPropertyKey(entry.getKey());
buffer.writeVInt(propertyKey.id().intValue());
buffer.writeProperty(propertyKey.dataType(),entry.getValue());
}
array = buffer.bytes();
}

return array;
}

public short getPartition(HugeType type, Id id) {
int hashcode = Arrays.hashCode(id.asBytes());
short partition = 1;
if (type.isEdge()) {
partition = (short) (hashcode % edgeLogicPartitions);
} else if (type.isVertex()) {
partition = (short) (hashcode % vertexLogicPartitions);
}
return partition > 0 ? partition : (short) -partition;
}

public int getEdgeLogicPartitions(){
return this.edgeLogicPartitions;
}

public int getVertexLogicPartitions(){
return this.vertexLogicPartitions;
}

public void close(){
this.client.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.serializer.direct;

/**
* TODO: review later
*
* In this serializer, we only support normal type now:
* - number
* - string
* And they will be transferred to bytes directly
**/
public class RocksDBSerializer {
// TODO: Support write RocksDB directly
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.serializer.direct.reuse;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.baidu.hugegraph.driver.GraphManager;
import com.baidu.hugegraph.driver.HugeClient;
import com.baidu.hugegraph.driver.SchemaManager;
import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
import com.baidu.hugegraph.serializer.direct.RocksDBSerializer;
import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.structure.graph.Vertex;

/**
* @author jin
* This class is a demo for rocksdb/HBase put(rowkey, values) which use Client-Side's graph struct
* And we don't need to construct the graph element, just use it and transfer them to bytes array
* instead of json format
*/
public class BytesDemo {

static HugeClient client;
boolean bypassServer = true;
RocksDBSerializer ser;
HBaseSerializer HBaseSer;

public static void main(String[] args) {
BytesDemo ins = new BytesDemo();
ins.initGraph();
}

void initGraph() {
int edgeLogicPartitions = 16;
int vertexLogicPartitions = 8;
// If connect failed will throw an exception.
client = HugeClient.builder("http://localhost:8081", "hugegraph").build();

SchemaManager schema = client.schema();


schema.propertyKey("name").asText().ifNotExist().create();
schema.propertyKey("age").asInt().ifNotExist().create();
schema.propertyKey("lang").asText().ifNotExist().create();
schema.propertyKey("date").asText().ifNotExist().create();
schema.propertyKey("price").asText().ifNotExist().create();

schema.vertexLabel("person")
.properties("name", "age")
.useCustomizeStringId()
.enableLabelIndex(false)
.ifNotExist()
.create();

schema.vertexLabel("personB")
.properties("price")
.nullableKeys("price")
.useCustomizeNumberId()
.enableLabelIndex(false)
.ifNotExist()
.create();

schema.vertexLabel("software")
.properties("name", "lang", "price")
.useCustomizeStringId()
.enableLabelIndex(false)
.ifNotExist()
.create();

schema.edgeLabel("knows")
.link("person", "person")
.properties("date")
.enableLabelIndex(false)
.ifNotExist()
.create();

schema.edgeLabel("created")
.link("person", "software")
.properties("date")
.enableLabelIndex(false)
.ifNotExist()
.create();

HBaseSer = new HBaseSerializer(client, vertexLogicPartitions, edgeLogicPartitions);
writeGraphElements();

client.close();
}

private void writeGraphElements() {
GraphManager graph = client.graph();
// construct some vertexes & edges
Vertex peter = new Vertex("person");
peter.property("name", "peter");
peter.property("age", 35);
peter.id("peter");

Vertex lop = new Vertex("software");
lop.property("name", "lop");
lop.property("lang", "java");
lop.property("price", "328");
lop.id("lop");

Vertex vadasB = new Vertex("personB");
vadasB.property("price", "120");
vadasB.id(12345);

Edge peterCreateLop = new Edge("created").source(peter).target(lop).property("date", "2017-03-24");

List<Vertex> vertices = new ArrayList<Vertex>(){{
add(peter);add(lop);add(vadasB);
}};


List<Edge> edges = new ArrayList<Edge>(){{
add(peterCreateLop);
}};

// Old way: encode to json then send to server
if (bypassServer) {
writeDirectly(vertices, edges);
} else {
writeByServer(graph, vertices, edges);
}
}

/* we transfer the vertex & edge into bytes array
* TODO: use a batch and send them together
* */
void writeDirectly(List<Vertex> vertices, List<Edge> edges) {
for (Vertex vertex : vertices) {
byte[] rowkey = HBaseSer.getKeyBytes(vertex);
byte[] values = HBaseSer.getValueBytes(vertex);
sendRpcToHBase("vertex", rowkey, values);
}

for (Edge edge: edges) {
byte[] rowkey = HBaseSer.getKeyBytes(edge);
byte[] values = HBaseSer.getValueBytes(edge);
sendRpcToHBase("edge", rowkey, values);
}
}

boolean sendRpcToRocksDB(byte[] rowkey, byte[] values) {
// here we call the rpc
boolean flag = false;
//flag = put(rowkey, values);
return flag;
}

void writeByServer(GraphManager graph, List<Vertex> vertices, List<Edge> edges) {
vertices = graph.addVertices(vertices);
vertices.forEach(System.out::println);

edges = graph.addEdges(edges, false);
edges.forEach(System.out::println);
}

boolean sendRpcToHBase(String type, byte[] rowkey, byte[] values) {
boolean flag = false;
try {
flag = put(type, rowkey, values);
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}


boolean put (String type, byte[] rowkey, byte[] values) throws IOException {
// TODO: put to HBase
return true;
}

}
Loading