diff --git a/hugegraph-pd/pom.xml b/hugegraph-pd/pom.xml index 335fb0f375..b5db57f267 100644 --- a/hugegraph-pd/pom.xml +++ b/hugegraph-pd/pom.xml @@ -141,6 +141,7 @@ *.tar.gz .flattened-pom.xml + dist/** false diff --git a/hugegraph-store/hg-store-cli/pom.xml b/hugegraph-store/hg-store-cli/pom.xml new file mode 100644 index 0000000000..8757a09c8f --- /dev/null +++ b/hugegraph-store/hg-store-cli/pom.xml @@ -0,0 +1,82 @@ + + + + + + org.apache.hugegraph + hugegraph-store + ${revision} + ../pom.xml + + 4.0.0 + + hg-store-cli + + + + org.springframework.boot + spring-boot-starter + 2.5.14 + + + org.springframework.boot + spring-boot-starter-logging + + + + + org.apache.hugegraph + hg-store-client + ${revision} + + + org.apache.hugegraph + hg-store-grpc + ${revision} + + + org.apache.hugegraph + hg-pd-client + ${revision} + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 2.5.14 + + + + repackage + + + + org.apache.hugegraph.store.cli.StoreConsoleApplication + + + + + + + + diff --git a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/AppConfig.java b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/AppConfig.java new file mode 100644 index 0000000000..31ef32222b --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/AppConfig.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.store.cli; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import lombok.Data; + +@Data +@Component +public class AppConfig { + + @Value("${pd.address}") + private String pdAddress; + + @Value("${net.kv.scanner.page.size}") + private int scannerPageSize; + + @Value("${scanner.graph}") + private String scannerGraph; + + @Value("${scanner.table}") + private String scannerTable; + + @Value("${scanner.max}") + private int scannerMax; + + @Value("${scanner.mod}") + private int scannerModNumber; + + @Value("${committer.graph}") + private String committerGraph; + + @Value("${committer.table}") + private String committerTable; + + @Value("${committer.amount}") + private int committerAmount; +} diff --git a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/StoreConsoleApplication.java b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/StoreConsoleApplication.java new file mode 100644 index 0000000000..32949711f8 --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/StoreConsoleApplication.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.store.cli; + +import java.io.IOException; + +import org.apache.hugegraph.pd.client.PDConfig; +import org.apache.hugegraph.pd.common.PDException; +import org.apache.hugegraph.store.HgStoreClient; +import org.apache.hugegraph.store.cli.loader.HgThread2DB; +import org.apache.hugegraph.store.cli.scan.GrpcShardScanner; +import org.apache.hugegraph.store.cli.scan.HgStoreScanner; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import lombok.extern.slf4j.Slf4j; + +/** + * 2022/2/14 + */ +@SpringBootApplication +@Slf4j +public class StoreConsoleApplication implements CommandLineRunner { + + // TODO: this package seems to have many useless class and code, need to be updated. + @Autowired + private AppConfig appConfig; + + public static void main(String[] args) { + log.info("Starting StoreConsoleApplication"); + SpringApplication.run(StoreConsoleApplication.class, args); + log.info("StoreConsoleApplication finished."); + } + + @Override + public void run(String... args) throws IOException, InterruptedException, PDException { + if (args.length <= 0) { + log.warn("参数类型 cmd[-load, -query, -scan]"); + } else { + switch (args[0]) { + case "-load": + HgThread2DB hgThread2DB = new HgThread2DB(args[1]); + if (!args[3].isEmpty()) { + hgThread2DB.setGraphName(args[3]); + } + try { + if ("order".equals(args[2])) { + hgThread2DB.testOrder(args[4]); + } else { + hgThread2DB.startMultiprocessInsert(args[2]); + } + } catch (IOException e) { + e.printStackTrace(); + } + break; + case "-query": + HgThread2DB hgDB = new HgThread2DB(args[1]); + try { + hgDB.startMultiprocessQuery("12", args[2]); + } catch (IOException e) { + e.printStackTrace(); + } + break; + case "-scan": + if (args.length < 4) { + log.warn("参数类型 -scan pd graphName tableName"); + } else { + doScan(args[1], args[2], args[3]); + } + break; + case "-shard": + GrpcShardScanner scanner = new GrpcShardScanner(); + scanner.getData(); + break; + case "-shard-single": + scanner = new GrpcShardScanner(); + scanner.getDataSingle(); + break; + default: + log.warn("参数类型错误,未执行任何程序"); + } + } + } + + private void doScan(String pd, String graphName, String tableName) throws PDException { + HgStoreClient storeClient = HgStoreClient.create(PDConfig.of(pd) + .setEnableCache(true)); + + HgStoreScanner storeScanner = HgStoreScanner.of(storeClient, graphName); + storeScanner.scanTable2(tableName); + } +} diff --git a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/loader/HgThread2DB.java b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/loader/HgThread2DB.java new file mode 100644 index 0000000000..67a77d5831 --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/loader/HgThread2DB.java @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.store.cli.loader; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hugegraph.pd.client.PDClient; +import org.apache.hugegraph.pd.client.PDConfig; +import org.apache.hugegraph.store.HgKvEntry; +import org.apache.hugegraph.store.HgKvIterator; +import org.apache.hugegraph.store.HgOwnerKey; +import org.apache.hugegraph.store.HgScanQuery; +import org.apache.hugegraph.store.HgStoreClient; +import org.apache.hugegraph.store.HgStoreSession; +import org.apache.hugegraph.store.cli.util.HgCliUtil; +import org.apache.hugegraph.store.client.grpc.KvCloseableIterator; +import org.apache.hugegraph.store.client.util.MetricX; + +import lombok.extern.slf4j.Slf4j; + +/** + * 使用pd,支持raft + * 读取文件并多线程进行入库 + */ +@Slf4j +public class HgThread2DB { + + /*正在进行和在排队的任务的总数*/ + private static final AtomicInteger taskTotal = new AtomicInteger(0); + private static final AtomicInteger queryTaskTotal = new AtomicInteger(0); + private static final AtomicLong insertDataCount = new AtomicLong(); + private static final AtomicLong queryCount = new AtomicLong(); + private static final AtomicLong totalQueryCount = new AtomicLong(); + private static final AtomicLong longId = new AtomicLong(); + private static final CountDownLatch countDownLatch = null; + private static PDClient pdClient; + private static ThreadPoolExecutor threadPool = null; + private static ThreadPoolExecutor queryThreadPool = null; + private static int limitScanBatchCount = 100; + private static ArrayBlockingQueue listQueue = null; + private final HgStoreClient storeClient; + public String graphName = "hugegraphtest"; + volatile long startTime = System.currentTimeMillis(); + + public HgThread2DB(String pdAddr) { + int threadCount = Runtime.getRuntime().availableProcessors(); + + listQueue = new ArrayBlockingQueue>(100000000); + queryThreadPool = new ThreadPoolExecutor(500, 1000, + 200, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1000)); + threadPool = new ThreadPoolExecutor(threadCount * 2, threadCount * 3, + 200, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(threadCount + 100)); + storeClient = HgStoreClient.create(PDConfig.of(pdAddr) + .setEnableCache(true)); + pdClient = storeClient.getPdClient(); + } + + public void setGraphName(String graphName) { + this.graphName = graphName; + log.info("setGraphName {}", graphName); + } + + public boolean singlePut(String tableName + , List keys) throws InterruptedException { + HgStoreSession session = storeClient.openSession(graphName); + session.beginTx(); + + keys.forEach((strKey) -> { + insertDataCount.getAndIncrement(); + int j = strKey.indexOf("\t"); +// byte[] key = HgCliUtil.toBytes(strKey.substring(0, j)); + HgOwnerKey hgKey = HgCliUtil.toOwnerKey(strKey.substring(0, j), strKey); + byte[] value = HgCliUtil.toBytes(strKey.substring(j + 1)); + session.put(tableName, hgKey, value); + + }); + if (insertDataCount.get() > 10000000) { + synchronized (insertDataCount) { + long count = insertDataCount.get(); + insertDataCount.set(0); + if (count > 10000000) { + log.info("count : " + count + " qps : " + + count * 1000 / (System.currentTimeMillis() - startTime) + + " threadCount : " + taskTotal); + startTime = System.currentTimeMillis(); + } + } + } + if (!keys.isEmpty()) { + if (session.isTx()) { + session.commit(); + } else { + session.rollback(); + } + } + + return true; + } + + public boolean singlePut(String tableName) throws InterruptedException { + HgStoreSession session = storeClient.openSession(graphName); + session.beginTx(); + + int maxlist = 100; + + for (int y = 0; y < maxlist; y++) { + insertDataCount.getAndIncrement(); + String strLine = getLong() + getLong() + getLong() + getLong(); + HgOwnerKey hgKey = HgCliUtil.toOwnerKey(strLine, strLine); + byte[] value = HgCliUtil.toBytes(strLine); + session.put(tableName, hgKey, value); + } + + if (insertDataCount.get() > 10000000) { + synchronized (insertDataCount) { + long count = insertDataCount.get(); + insertDataCount.set(0); + if (count > 10000000) { + log.info("count : " + count + " qps : " + + count * 1000 / (System.currentTimeMillis() - startTime) + + " threadCount : " + taskTotal); + startTime = System.currentTimeMillis(); + } + } + } + + if (session.isTx()) { + session.commit(); + } else { + session.rollback(); + } + + return true; + } + + public boolean testOrder(String input) { + String tableName = "hugegraph02"; + HgStoreSession session = storeClient.openSession(graphName); + session.beginTx(); + int loop = Integer.parseInt(input); + if (loop == 0) { + loop = 2000; + } + for (int i = 0; i < loop; i++) { + long startTime = System.currentTimeMillis(); + HgOwnerKey hgOwnerKey = + HgCliUtil.toOwnerKey(startTime + "owner:" + i, startTime + "k:" + i); + session.put(tableName, hgOwnerKey, HgCliUtil.toBytes(i)); + } + + if (session.isTx()) { + session.commit(); + } else { + session.rollback(); + } + + try { + HgKvIterator iterable = session.scanIterator(tableName); + int x = 0; + while (iterable.hasNext()) { + HgKvEntry entry = iterable.next(); + x++; + } + log.info("x={}", x); + } catch (Exception e) { + log.error("query error, message: {}", e.getMessage()); + } + + return true; + } + + /** + * 多线程读取文件入库 + * + * @throws IOException + * @throws InterruptedException + */ + public void startMultiprocessInsert(String filepath) throws IOException { + log.info("--- start startMultiprocessInsert---"); + startTime = System.currentTimeMillis(); + File readfile = new File(filepath); + MetricX metrics = null; + long dataCount = 0; + if (readfile.exists()) { + // 读取文件 + InputStreamReader isr = new InputStreamReader(new FileInputStream(readfile), + StandardCharsets.UTF_8); + BufferedReader reader = new BufferedReader(isr); + + String strLine = null; + String tableName = HgCliUtil.TABLE_NAME; + // 积攒到多少个后执行线程入库,10万 + int maxlist = 100000; + List keys = new ArrayList<>(maxlist); + metrics = MetricX.ofStart(); + try { + while ((strLine = reader.readLine()) != null) { + keys.add(strLine); + dataCount++; + + // 读取文件中的10000条数据,启一个线程入库 + if (dataCount % maxlist == 0) { + List finalKeys = keys; + Runnable task = () -> { + try { + if (!finalKeys.isEmpty()) { + boolean ret = singlePut(tableName, finalKeys); + } + } catch (Exception e) { + e.printStackTrace(); + } + taskTotal.decrementAndGet(); + synchronized (taskTotal) { + taskTotal.notifyAll(); + } + }; + taskTotal.getAndIncrement(); + threadPool.execute(task); + + while (taskTotal.get() > 100) { + synchronized (taskTotal) { + taskTotal.wait(); + } + } + // keys.remove(0); + keys = new ArrayList<>(maxlist); + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + isr.close(); + reader.close(); + // 把剩余的入库 + if (!keys.isEmpty()) { + List finalKeys1 = keys; + Runnable task = () -> { + try { + boolean ret = singlePut(tableName, finalKeys1); + } catch (Exception e) { + e.printStackTrace(); + } + taskTotal.decrementAndGet(); + synchronized (taskTotal) { + taskTotal.notifyAll(); + } + }; + threadPool.execute(task); + taskTotal.getAndIncrement(); + } + while (taskTotal.get() > 0) { + synchronized (taskTotal) { + try { + taskTotal.wait(1000); + if (taskTotal.get() > 0) { + System.out.println("wait thread exit " + taskTotal.get()); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + threadPool.shutdown(); + + } else { + System.out.println("样本文件不存在:" + filepath); + } + metrics.end(); + log.info("*************************************************"); + log.info(" 主进程执行时间:" + metrics.past() / 1000 + "秒,一共执行:" + dataCount + "条"); + log.info("*************************************************"); + System.out.println(" 主进程执行时间 " + metrics.past() / 1000 + "秒"); + System.out.println("-----主进程执行结束---------"); + } + + /** + * 多线程读取文件入库 + * + * @throws IOException + * @throws InterruptedException + */ + public void autoMultiprocessInsert() throws IOException { + log.info("--- start autoMultiprocessInsert---"); + startTime = System.currentTimeMillis(); + + MetricX metrics = null; + long dataCount = 0; + + String strLine = null; + String tableName = HgCliUtil.TABLE_NAME; + // 积攒到多少个后执行线程入库,10万 + int maxlist = 100000; + List keys = new ArrayList<>(maxlist); + for (int x = 0; x < 10000000; x++) { + metrics = MetricX.ofStart(); + try { + Runnable task = () -> { + try { + boolean ret = singlePut(tableName); + } catch (Exception e) { + e.printStackTrace(); + } + taskTotal.decrementAndGet(); + synchronized (taskTotal) { + taskTotal.notifyAll(); + } + }; + taskTotal.getAndIncrement(); + threadPool.execute(task); + + while (taskTotal.get() > 100) { + synchronized (taskTotal) { + taskTotal.wait(); + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + while (taskTotal.get() > 0) { + synchronized (taskTotal) { + try { + taskTotal.wait(1000); + if (taskTotal.get() > 0) { + System.out.println("wait thread exit " + taskTotal.get()); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + threadPool.shutdown(); + + metrics.end(); + log.info("*************************************************"); + log.info(" 主进程执行时间:" + metrics.past() / 1000 + "秒,一共执行:" + dataCount + "条"); + log.info("*************************************************"); + System.out.println(" 主进程执行时间 " + metrics.past() / 1000 + "秒"); + System.out.println("-----主进程执行结束---------"); + } + + public String getLong() { + //如果需要更长 或者更大冗余空间, 只需要 time * 10^n 即可 + //当前可保证1毫秒 生成 10000条不重复 + return String.format("%019x", longId.getAndIncrement()); + } + + /** + * 执行查询,并将查询的结果做为下一次迭代的点放入队列 + */ + private void queryAnd2Queue() { + try { + HgStoreSession session = storeClient.openSession(graphName); + HashSet hashSet = new HashSet<>(); + while (!listQueue.isEmpty()) { + + log.info(" ====== start scanBatch2 count:{} list:{}=============", + queryThreadPool.getActiveCount(), listQueue.size()); + List keys = (List) listQueue.take(); + List newQueryList = new ArrayList<>(); + + KvCloseableIterator> iterators = + session.scanBatch2( + HgScanQuery.prefixIteratorOf(HgCliUtil.TABLE_NAME, keys.iterator()) + ); + + while (iterators.hasNext()) { + HgKvIterator iterator = iterators.next(); + int insertQueueCount = 0; + while (iterator.hasNext()) { + HgKvEntry entry = iterator.next(); + String newPoint = HgCliUtil.toStr(entry.value()); +// log.info("query_key =" + newPoint); + // 统计查询次数 + if (!newPoint.isEmpty() && hashSet.add(newPoint)) { + queryCount.getAndIncrement(); + totalQueryCount.getAndIncrement(); + + HgOwnerKey hgKey = HgCliUtil.toOwnerKey(newPoint, newPoint); + newQueryList.add(hgKey); + + if (queryCount.get() > 1000000) { + synchronized (queryCount) { + long count = queryCount.get(); + queryCount.set(0); + if (count > 1000000) { + log.info("count : " + count + " qps : " + count * 1000 / + (System.currentTimeMillis() - + startTime) + + " threadCount : " + + queryThreadPool.getActiveCount() + " queueSize:" + + listQueue.size()); + startTime = System.currentTimeMillis(); + } + } + } + // 达到1万个点后,去查询一次 + if (newQueryList.size() > 10000 && listQueue.size() < 10000) { + listQueue.put(newQueryList); + insertQueueCount++; + newQueryList = new ArrayList<>(); + if (insertQueueCount > 2) { + break; + } + } + } + } + } + // 一次查询如果不够1万,单独提交一次查询,确保所有的结果都能执行查询 + if (!newQueryList.isEmpty() && listQueue.size() < 1000) { + listQueue.put(newQueryList); + } + + iterators.close(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + log.info("============= thread done =============="); + countDownLatch.countDown(); + } + + /** + * 多线程查询 + * + * @param point 起始查询点,后续根据这个点查询到的value做为下一次的查询条件进行迭代 + * @param scanCount 允许启动的线程数量 + * @throws IOException + * @throws InterruptedException + */ + public void startMultiprocessQuery(String point, String scanCount) throws IOException, + InterruptedException { + log.info("--- start startMultiprocessQuery---"); + startTime = System.currentTimeMillis(); + MetricX metrics = MetricX.ofStart(); + limitScanBatchCount = Integer.parseInt(scanCount); + + CountDownLatch latch = new CountDownLatch(limitScanBatchCount); + HgStoreSession session = storeClient.openSession(graphName); + + final AtomicLong[] counter = {new AtomicLong()}; + final long[] start = {System.currentTimeMillis()}; + + LinkedBlockingQueue[] queue = new LinkedBlockingQueue[limitScanBatchCount]; + for (int i = 0; i < limitScanBatchCount; i++) { + queue[i] = new LinkedBlockingQueue(); + } + List strKey = Arrays.asList( + "20727483", "50329304", "26199460", "1177521", "27960125", + "30440025", "15833920", "15015183", "33153097", "21250581"); + strKey.forEach(key -> { + log.info("newkey:{}", key); + HgOwnerKey hgKey = HgCliUtil.toOwnerKey(key, key); + queue[0].add(hgKey); + }); + + for (int i = 0; i < limitScanBatchCount; i++) { + int finalI = i; + KvCloseableIterator> iterators = + session.scanBatch2( + HgScanQuery.prefixIteratorOf(HgCliUtil.TABLE_NAME, + new Iterator() { + HgOwnerKey current = null; + + @Override + public boolean hasNext() { + while (current == null) { + try { + current = + (HgOwnerKey) queue[finalI].poll( + 1, + TimeUnit.SECONDS); + } catch ( + InterruptedException e) { + // + } + } + if (current == null) { + log.warn( + "===== current is " + + "null =========="); + } + return current != null; + } + + @Override + public HgOwnerKey next() { + return current; + } + }) + ); + + new Thread(() -> { + while (iterators.hasNext()) { + HgKvIterator iterator = iterators.next(); + long c = 0; + while (iterator.hasNext()) { + String newPoint = HgCliUtil.toStr(iterator.next().value()); + HgOwnerKey newHgKey = HgCliUtil.toOwnerKey(newPoint, newPoint); + if (queue[(int) (c % limitScanBatchCount)].size() < 1000000) { + queue[(int) (c % limitScanBatchCount)].add(newHgKey); + } + c++; + } + if (counter[0].addAndGet(c) > 1000000) { + synchronized (counter) { + if (counter[0].get() > 10000000) { + log.info("count {}, qps {}", counter[0].get(), + counter[0].get() * 1000 / + (System.currentTimeMillis() - start[0])); + start[0] = System.currentTimeMillis(); + counter[0].set(0); + } + } + } + } + }, "client query thread:" + i).start(); + log.info("===== read thread exit =========="); + } + latch.await(); + + metrics.end(); + log.info("*************************************************"); + log.info(" 主进程执行时间:" + metrics.past() / 1000 + "秒; 查询:" + totalQueryCount.get() + + "次,qps:" + totalQueryCount.get() * 1000 / metrics.past()); + log.info("*************************************************"); + System.out.println("-----主进程执行结束---------"); + } + +} diff --git a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/scan/GrpcShardScanner.java b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/scan/GrpcShardScanner.java new file mode 100644 index 0000000000..29dbbc281c --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/scan/GrpcShardScanner.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.store.cli.scan; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hugegraph.store.grpc.GraphStoreGrpc; +import org.apache.hugegraph.store.grpc.GraphStoreGrpc.GraphStoreStub; +import org.apache.hugegraph.store.grpc.Graphpb.ScanPartitionRequest; +import org.apache.hugegraph.store.grpc.Graphpb.ScanPartitionRequest.Reply; +import org.apache.hugegraph.store.grpc.Graphpb.ScanResponse; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class GrpcShardScanner { + + private final boolean closed = false; + private final AtomicInteger sum = new AtomicInteger(); + private final ConcurrentHashMap> + observers = new ConcurrentHashMap<>(); + + public void getData() { + ExecutorService service = new ThreadPoolExecutor(500, Integer.MAX_VALUE, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>()); + long start = System.currentTimeMillis(); + + String[] addresses = new String[]{"10.14.139.71:8500", + "10.14.139.70:8500", + "10.14.139.69:8500"}; + int pSize = 72; + int size = pSize * addresses.length; + CountDownLatch latch = new CountDownLatch(size); + for (int j = 0; j < pSize; j++) { + for (int i = 0; i < addresses.length; i++) { + String address = addresses[i]; + int finalJ = j; + service.execute(() -> getData(finalJ, latch, address)); + } + } + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + long end = System.currentTimeMillis(); + long cost = end - start; + log.info("all rows are: {}, cost: {},avg: {}", sum.get(), + cost, sum.get() / cost * 1000); + } + + public void getData(int pId, CountDownLatch latch, String address) { + try { + ScanPartitionRequest.Builder builder = + ScanPartitionRequest.newBuilder(); + ScanPartitionRequest.Request.Builder srb = + ScanPartitionRequest.Request.newBuilder(); + ScanPartitionRequest.Request request = + srb.setGraphName("DEFAULT/hugegraph2/g") + .setScanType( + ScanPartitionRequest.ScanType.SCAN_EDGE) + .setTable("g+oe").setBoundary(0x10) + .setPartitionId(pId).build(); + ManagedChannel c = + ManagedChannelBuilder.forTarget(address) + .usePlaintext().build(); + int maxSize = 1024 * 1024 * 1024; + GraphStoreStub stub; + stub = GraphStoreGrpc.newStub(c) + .withMaxInboundMessageSize(maxSize) + .withMaxOutboundMessageSize(maxSize); + + AtomicInteger count = new AtomicInteger(); + long start = System.currentTimeMillis(); + long id = Thread.currentThread().getId(); + StreamObserver ro = + new StreamObserver() { + @Override + public void onNext(ScanResponse value) { + try { + int edgeSize = value.getEdgeCount(); + int vertexSize = value.getVertexCount(); + if (request.getScanType().equals( + ScanPartitionRequest.ScanType.SCAN_VERTEX)) { + count.getAndAdd(vertexSize); + } else { + count.getAndAdd(edgeSize); + } + // String print = JsonFormat.printer().print + // (value); + // System.out.println(print); + ScanPartitionRequest.Builder builder = + ScanPartitionRequest.newBuilder(); + builder.setScanRequest(request); + Reply.Builder reply = Reply.newBuilder(); + reply.setSeqNo(1); + builder.setReplyRequest(reply); + observers.get(id).onNext(builder.build()); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void onError(Throwable t) { + log.warn("调用grpc接口发生错误", t); + latch.countDown(); + } + + @Override + public void onCompleted() { + long time = System.currentTimeMillis() - start; + log.info("scan id : {}, complete: {} ,time:{}", + pId, count.get(), time); + sum.addAndGet(count.get()); + latch.countDown(); + } + }; + StreamObserver observer = + stub.scanPartition(ro); + observers.put(id, observer); + builder.setScanRequest(request); + observer.onNext(builder.build()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void getDataSingle() { + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> getData(58, latch, "10.14.139.71:8500")).start(); + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + log.info("all rows are: {}", sum.get()); + } + +} diff --git a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/scan/HgStoreCommitter.java b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/scan/HgStoreCommitter.java new file mode 100644 index 0000000000..cf31e779f9 --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/scan/HgStoreCommitter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.store.cli.scan; + +import org.apache.hugegraph.store.HgOwnerKey; +import org.apache.hugegraph.store.HgSessionManager; +import org.apache.hugegraph.store.HgStoreSession; +import org.apache.hugegraph.store.cli.util.HgCliUtil; +import org.apache.hugegraph.store.client.HgStoreNodeManager; + +/** + * 2022/2/28 + */ +public class HgStoreCommitter { + + protected final static HgStoreNodeManager nodeManager = HgStoreNodeManager.getInstance(); + + private final String graph; + + private HgStoreCommitter(String graph) { + this.graph = graph; + } + + public static HgStoreCommitter of(String graph) { + return new HgStoreCommitter(graph); + } + + protected HgStoreSession getStoreSession() { + return HgSessionManager.getInstance().openSession(this.graph); + } + + protected HgStoreSession getStoreSession(String graphName) { + return HgSessionManager.getInstance().openSession(graphName); + } + + public void put(String tableName, int amount) { + //*************** Put Benchmark **************//* + String keyPrefix = "PUT-BENCHMARK"; + HgStoreSession session = getStoreSession(); + + int length = String.valueOf(amount).length(); + + session.beginTx(); + + long start = System.currentTimeMillis(); + for (int i = 0; i < amount; i++) { + HgOwnerKey key = HgCliUtil.toOwnerKey( + keyPrefix + "-" + HgCliUtil.padLeftZeros(String.valueOf(i), length)); + byte[] value = HgCliUtil.toBytes(keyPrefix + "-V-" + i); + + session.put(tableName, key, value); + + if ((i + 1) % 100_000 == 0) { + HgCliUtil.println("---------- " + (i + 1) + " --------"); + HgCliUtil.println( + "Preparing took: " + (System.currentTimeMillis() - start) + " ms."); + session.commit(); + HgCliUtil.println( + "Committing took: " + (System.currentTimeMillis() - start) + " ms."); + start = System.currentTimeMillis(); + session.beginTx(); + } + } + + if (session.isTx()) { + session.commit(); + } + + } +} diff --git a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/scan/HgStoreScanner.java b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/scan/HgStoreScanner.java new file mode 100644 index 0000000000..bbc40ca867 --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/scan/HgStoreScanner.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.store.cli.scan; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hugegraph.pd.client.PDClient; +import org.apache.hugegraph.pd.common.PDException; +import org.apache.hugegraph.pd.grpc.Metapb; +import org.apache.hugegraph.store.HgKvEntry; +import org.apache.hugegraph.store.HgKvIterator; +import org.apache.hugegraph.store.HgKvStore; +import org.apache.hugegraph.store.HgScanQuery; +import org.apache.hugegraph.store.HgSessionManager; +import org.apache.hugegraph.store.HgStoreClient; +import org.apache.hugegraph.store.HgStoreSession; +import org.apache.hugegraph.store.cli.util.HgCliUtil; +import org.apache.hugegraph.store.cli.util.HgMetricX; +import org.apache.hugegraph.store.client.grpc.KvCloseableIterator; +import org.apache.hugegraph.store.client.util.HgStoreClientConfig; +import org.apache.hugegraph.store.client.util.MetricX; + +import lombok.extern.slf4j.Slf4j; + +/** + * 2022/2/14 + */ +@Slf4j +public class HgStoreScanner { + + public static final byte[] EMPTY_BYTES = new byte[0]; + private final HgStoreClient storeClient; + private final String graphName; + private long modNumber = 1_000_000; + private int max = 10_000_000; + + private HgStoreScanner(HgStoreClient storeClient, String graph) { + this.storeClient = storeClient; + this.graphName = graph; + } + + public static HgStoreScanner of(HgStoreClient storeClient, String graph) { + return new HgStoreScanner(storeClient, graph); + } + + public long getModNumber() { + return modNumber; + } + + public void setModNumber(int modNumber) { + if (modNumber <= 0) { + return; + } + this.modNumber = modNumber; + } + + public int getMax() { + return max; + } + + public void setMax(int max) { + if (modNumber <= 0) { + return; + } + this.max = max; + } + + protected HgStoreSession getStoreSession() { + return HgSessionManager.getInstance().openSession(this.graphName); + } + + protected HgStoreSession getStoreSession(String graphName) { + return HgSessionManager.getInstance().openSession(graphName); + } + + public void scanTable(String tableName) { + log.info("Starting scan table [{}] of graph [{}] ...", tableName, graphName); + HgMetricX hgMetricX = HgMetricX.ofStart(); + HgStoreSession session = getStoreSession(); + int count = 0; + KvCloseableIterator> iterator = + session.scanBatch2(HgScanQuery.tableOf(tableName)); + + long start = System.currentTimeMillis(); + while (iterator.hasNext()) { + HgKvIterator iterator2 = iterator.next(); + while (iterator2.hasNext()) { + + count++; + iterator2.next(); + if (count % (modNumber) == 0) { + log.info("Scanning keys: " + count + " time is " + modNumber * 1000 + / + (System.currentTimeMillis() - + start)); + start = System.currentTimeMillis(); + } + if (count == max) { + break; + } + + } + } + iterator.close(); + + hgMetricX.end(); + log.info("*************************************************"); + log.info("************* Scanning Completed **************"); + log.info("Graph: {}", graphName); + log.info("Table: {}", tableName); + log.info("Keys: {}", count); + log.info("Max: {}", max); + log.info("Waiting: {} seconds.", MetricX.getIteratorWait() / 1000); + log.info("Total: {} seconds.", hgMetricX.past() / 1000); + log.info("Iterator: [{}]", iterator.getClass().getSimpleName()); + log.info("Page: {}", HgStoreClientConfig.of().getNetKvScannerPageSize()); + log.info("*************************************************"); + } + + public void scanHash() { + + String tableName = "g+i"; + HgMetricX hgMetricX = HgMetricX.ofStart(); + String graphName = "/DEFAULT/graphs/hugegraph1/"; + HgStoreSession session = getStoreSession(graphName); + int count = 0; + String query = + "{\"conditions\":[{\"cls\":\"S\",\"el\":{\"key\":\"ID\",\"relation\":\"SCAN\"," + + "\"value\"" + + ":{\"start\":\"61180\",\"end\":\"63365\",\"length\":0}}}]," + + "\"optimizedType\":\"NONE\",\"ids\":[]," + + "\"mustSortByInput\":true,\"resultType\":\"EDGE\",\"offset\":0," + + "\"actualOffset\":0,\"actualStoreOffset\":" + + "0,\"limit\":9223372036854775807,\"capacity\":-1,\"showHidden\":false," + + "\"showDeleting\":false," + + "\"showExpired\":false,\"olap\":false,\"withProperties\":false,\"olapPks\":[]}"; + //HgKvIterator iterator = session.scanIterator(tableName,0,715827883, + // HgKvStore.SCAN_ANY,null); + + //HgKvIterator iterator = session.scanIterator(tableName,61180,63365, 348, null); + //HgKvIterator iterator = session.scanIterator(tableName,0,65535, 348, null); + HgKvIterator iterator = session.scanIterator(tableName); + while (iterator.hasNext()) { + + count++; + //iterator.next(); + // if (count % (modNumber) == 0) { + // log.info("Scanning keys: " + count); + HgCliUtil.println(Arrays.toString(iterator.next().key())); + // } + if (count == max) { + break; + } + + } + + hgMetricX.end(); + log.info("*************************************************"); + log.info("************* Scanning Completed **************"); + log.info("Graph: {}", this.graphName); + log.info("Table: {}", tableName); + log.info("Keys: {}", count); + log.info("Max: {}", max); + log.info("Waiting: {} seconds.", MetricX.getIteratorWait() / 1000); + log.info("Total: {} seconds.", hgMetricX.past() / 1000); + log.info("Iterator: [{}]", iterator.getClass().getSimpleName()); + log.info("Page: {}", HgStoreClientConfig.of().getNetKvScannerPageSize()); + log.info("*************************************************"); + } + + public void scanTable2(String tableName) throws PDException { + // java -jar hg-store-cli-3.6.0-SNAPSHOT.jar -scan 10.45.30.212:8989 "DEFAULT/case_112/g" + // g+ie + PDClient pdClient = storeClient.getPdClient(); + List partitions = pdClient.getPartitions(0, graphName); + HgStoreSession session = storeClient.openSession(graphName); + int count = 0; + byte[] position = null; + HgMetricX hgMetricX = HgMetricX.ofStart(); + for (Metapb.Partition partition : partitions) { + while (true) { + try (HgKvIterator iterator = session.scanIterator(tableName, + (int) (partition.getStartKey()), + (int) (partition.getEndKey()), + HgKvStore.SCAN_HASHCODE, + EMPTY_BYTES)) { + if (position != null) { + iterator.seek(position); + } + while (iterator.hasNext()) { + iterator.next(); + count++; + if (count % 3000 == 0) { + if (iterator.hasNext()) { + iterator.next(); + position = iterator.position(); + System.out.println("count is " + count); + } else { + position = null; + } + break; + } + } + if (!iterator.hasNext()) { + position = null; + break; + } + } + } + } + hgMetricX.end(); + log.info("*************************************************"); + log.info("************* Scanning Completed **************"); + log.info("Graph: {}", graphName); + log.info("Table: {}", tableName); + log.info("Keys: {}", count); + log.info("Total: {} seconds.", hgMetricX.past() / 1000); + log.info("*************************************************"); + } + +} diff --git a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/util/HgCliUtil.java b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/util/HgCliUtil.java new file mode 100644 index 0000000000..5ec7b5ea9d --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/util/HgCliUtil.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.store.cli.util; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import org.apache.hugegraph.store.HgKvEntry; +import org.apache.hugegraph.store.HgOwnerKey; +import org.apache.hugegraph.store.HgStoreSession; +import org.apache.hugegraph.store.client.util.HgStoreClientConst; + +/** + * 2022/2/14 + */ +public class HgCliUtil { + + public final static String TABLE_NAME = "cli-table"; + + public static Map batchPut(HgStoreSession session, String keyPrefix) { + return batchPut(session, keyPrefix, 100); + } + + public static Map batchPut(HgStoreSession session, String keyPrefix, + int loop) { + return batchPut(session, TABLE_NAME, keyPrefix, loop); + } + + public static Map batchPut(HgStoreSession session, String tableName + , String keyPrefix, int loop) { + return batchPut(session, tableName, keyPrefix, loop, 1, key -> toOwnerKey(key)); + } + + public static Map batchPut(HgStoreSession session, String tableName + , String keyPrefix, int loop, int start) { + return batchPut(session, tableName, keyPrefix, loop, start, key -> toOwnerKey(key)); + } + + public static Map batchPut(HgStoreSession session, String tableName + , String keyPrefix, int loop, Function f) { + return batchPut(session, tableName, keyPrefix, loop, 1, f); + } + + public static Map batchPut(HgStoreSession session, String tableName + , String keyPrefix, int loop, int start, Function f) { + + Map res = new LinkedHashMap<>(); + + int length = String.valueOf(loop).length(); + + session.beginTx(); + for (int i = start; i <= loop; i++) { + + HgOwnerKey key = f.apply(keyPrefix + "-" + padLeftZeros(String.valueOf(i), length)); + + byte[] value = toBytes(keyPrefix + "-V-" + i); + res.put(key, value); + session.put(tableName, key, value); + + if ((i + 1) % 10000 == 0) { + println("commit: " + (i + 1)); + session.commit(); + session.beginTx(); + } + } + if (session.isTx()) { + session.commit(); + } + + return res; + } + + public static void printNum(List list, String title) { + if (list == null) { + return; + } + + println(title + " size: " + list.size()); + } + + public static void println(Iterator iterator) { + if (iterator == null) { + return; + } + while (iterator.hasNext()) { + println(iterator.next()); + } + + } + + public static void printOwner(List list) { + if (list == null) { + return; + } + + for (HgOwnerKey entry : list) { + println(entry); + } + } + + public static void println(List list) { + if (list == null) { + return; + } + + for (HgKvEntry entry : list) { + println(entry); + } + } + + public static void println(List list, int mod) { + if (list == null) { + return; + } + + for (int i = 0; i < list.size(); i++) { + if (i % mod == 0) { + println(list.get(i)); + } + } + } + + public static void println(HgKvEntry kv) { + if (kv == null) { + System.out.println("null"); + return; + } + println("[ " + toStr(kv.key()) + " : " + toStr(kv.value()) + " ]"); + } + + public static void println(HgOwnerKey key) { + if (key == null) { + System.out.println("null"); + return; + } + println("[ " + toInt(key.getOwner()) + " : " + toStr(key.getKey()) + " ]"); + } + + public static void println(String str) { + System.out.println(str); + } + + public static HgOwnerKey toOwnerKey(String key) { + return new HgOwnerKey(getOwner(key), toBytes(key)); + } + + public static HgOwnerKey toOwnerKey(byte[] key) { + return new HgOwnerKey(getOwner(key), key); + } + + private static byte[] getOwner(String key) { + return getOwner(toBytes(key)); + } + + private static byte[] getOwner(byte[] key) { + return toBytes(Arrays.hashCode(key)); + } + + public static HgOwnerKey toAllPartitionKey(String key) { + return HgOwnerKey.of(HgStoreClientConst.ALL_PARTITION_OWNER, toBytes(key)); + } + + public static HgOwnerKey toOwnerKey(String owner, String key) { + return HgOwnerKey.of(toBytes(owner), toBytes(key)); + } + + public static String toStr(byte[] b) { + if (b == null) { + return ""; + } + if (b.length == 0) { + return ""; + } + return new String(b, StandardCharsets.UTF_8); + } + + public static byte[] toBytes(String str) { + if (str == null) { + return null; + } + return str.getBytes(StandardCharsets.UTF_8); + } + + public static byte[] toBytes(long l) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(l); + return buffer.array(); + } + + private static byte[] toBytes(final int i) { + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.putInt(i); + return buffer.array(); + } + + public static long toLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(bytes); + buffer.flip();//need flip + return buffer.getLong(); + } + + public static long toInt(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES); + buffer.put(bytes); + buffer.flip();//need flip + return buffer.getInt(); + } + + public static String padLeftZeros(String str, int n) { + return String.format("%1$" + n + "s", str).replace(' ', '0'); + } + + public static String toSuffix(int num, int length) { + return "-" + padLeftZeros(String.valueOf(num), length); + } + + public static int amountOf(List list) { + if (list == null) { + return 0; + } + return list.size(); + } + + public static int amountOf(Iterator iterator) { + if (iterator == null) { + return 0; + } + int count = 0; + while (iterator.hasNext()) { + iterator.next(); + count++; + } + return count; + } +} diff --git a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/util/HgMetricX.java b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/util/HgMetricX.java new file mode 100644 index 0000000000..3f4cdf3afd --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/util/HgMetricX.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.store.cli.util; + +/** + * 2022/1/29 + */ +public class HgMetricX { + + private long start; + private long end; + + private long waitStart = System.currentTimeMillis(); + private long waitTotal; + + private HgMetricX(long start) { + this.start = start; + } + + public static HgMetricX ofStart() { + return new HgMetricX(System.currentTimeMillis()); + } + + public long start() { + return this.start = System.currentTimeMillis(); + } + + public long end() { + return this.end = System.currentTimeMillis(); + } + + public long past() { + return this.end - this.start; + } + + public long getWaitTotal() { + return this.waitTotal; + } + + public void startWait() { + this.waitStart = System.currentTimeMillis(); + } + + public void appendWait() { + this.waitTotal += System.currentTimeMillis() - waitStart; + } +} diff --git a/hugegraph-store/hg-store-cli/src/main/resources/application.yml b/hugegraph-store/hg-store-cli/src/main/resources/application.yml new file mode 100644 index 0000000000..ebb865360a --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/resources/application.yml @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +net: + kv: + scanner: + buffer.size: 10000 + page.size: 20000 + chan.size: 1 + +pd: + address: 10.14.139.71:8686 + +scanner: + #graph: 'default/icbc_hstoregraph/g' + graph: 'DEFAULT/hugegraph/g' + + table: 'g+ie' + #table: 'g+v' + max: 10_000_000 + mod: 1_000_000 + +committer: + graph: 'STORE_CLI' + table: 'benchmark' + amount: 1_000_000 diff --git a/hugegraph-store/hg-store-cli/src/main/resources/hg-store-client.properties b/hugegraph-store/hg-store-cli/src/main/resources/hg-store-client.properties new file mode 100644 index 0000000000..08e31b904c --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/resources/hg-store-client.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +#grpc.timeout.seconds=10 +#grpc.max.inbound.message.size= +#grpc.max.outbound.message.size= +net.kv.scanner.buffer.size=10000 +net.kv.scanner.page.size=20000 +#Unit:second +net.kv.scanner.have.next.timeout=1000 diff --git a/hugegraph-store/hg-store-cli/src/main/resources/log4j2.xml b/hugegraph-store/hg-store-cli/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..7b62afb0a7 --- /dev/null +++ b/hugegraph-store/hg-store-cli/src/main/resources/log4j2.xml @@ -0,0 +1,77 @@ + + + + + + + + logs + hg-store-client + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/hugegraph-store/hg-store-dist/pom.xml b/hugegraph-store/hg-store-dist/pom.xml new file mode 100644 index 0000000000..8d29f4815e --- /dev/null +++ b/hugegraph-store/hg-store-dist/pom.xml @@ -0,0 +1,152 @@ + + + + + + + org.apache.hugegraph + hugegraph-store + ${revision} + ../pom.xml + + 4.0.0 + + hg-store-dist + + + ${project.parent.basedir} + bash + ${project.basedir}/src/assembly + ${assembly.dir}/descriptor + ${assembly.dir}/static + apache-hugegraph-incubating-store-${project.parent.version} + hg-store-node + + + + org.apache.hugegraph + hg-store-node + ${revision} + + + + + + maven-assembly-plugin + 2.4 + + + assembly-hugegraph-store + package + + single + + + false + false + ${dist.dir} + + + ${assembly.descriptor.dir}/server-assembly.xml + + + ${final.name} + + + + + + maven-clean-plugin + + + + ${dist.dir} + + + + + + initialize + + clean + + + + + + maven-antrun-plugin + 1.3 + + + package + + run + + + + + + + + + tar zcvf \ + ${dist.dir}/${final.name}.tar.gz \ + ${final.name} || exit 1 + + rm -f ${dist.dir}/dist.sh + echo + echo "HugeGraph dist tar.gz available at: + ${dist.dir}/${final.name}.tar.gz" + echo + + + + + + + + + + + + + + + + + ant-contrib + ant-contrib + 1.0b3 + + + ant + ant + + + + + + + + + diff --git a/hugegraph-store/hg-store-dist/src/assembly/descriptor/server-assembly.xml b/hugegraph-store/hg-store-dist/src/assembly/descriptor/server-assembly.xml new file mode 100644 index 0000000000..3e7a6aeb55 --- /dev/null +++ b/hugegraph-store/hg-store-dist/src/assembly/descriptor/server-assembly.xml @@ -0,0 +1,59 @@ + + + + distribution + false + + + dir + + + + + + + ${assembly.static.dir}/bin + bin + + * + + 755 + + + ${assembly.static.dir}/conf + conf + + * + + + + + + + + /lib + false + runtime + false + + org.apache.hugegraph:${executable.jar.name}:jar:* + + + + + diff --git a/hugegraph-store/hg-store-dist/src/assembly/static/bin/libjemalloc.so b/hugegraph-store/hg-store-dist/src/assembly/static/bin/libjemalloc.so new file mode 100644 index 0000000000..768688dbf0 Binary files /dev/null and b/hugegraph-store/hg-store-dist/src/assembly/static/bin/libjemalloc.so differ diff --git a/hugegraph-store/hg-store-dist/src/assembly/static/bin/libjemalloc_aarch64.so b/hugegraph-store/hg-store-dist/src/assembly/static/bin/libjemalloc_aarch64.so new file mode 100644 index 0000000000..f6fc60642d Binary files /dev/null and b/hugegraph-store/hg-store-dist/src/assembly/static/bin/libjemalloc_aarch64.so differ diff --git a/hugegraph-store/hg-store-dist/src/assembly/static/bin/restart-hugegraph-store.sh b/hugegraph-store/hg-store-dist/src/assembly/static/bin/restart-hugegraph-store.sh new file mode 100644 index 0000000000..d1655353be --- /dev/null +++ b/hugegraph-store/hg-store-dist/src/assembly/static/bin/restart-hugegraph-store.sh @@ -0,0 +1,32 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +function abs_path() { + SOURCE="${BASH_SOURCE[0]}" + while [ -h "$SOURCE" ]; do + DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" + SOURCE="$(readlink "$SOURCE")" + [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" + done + echo "$( cd -P "$( dirname "$SOURCE" )" && pwd )" +} + +BIN=$(abs_path) +. "$BIN"/stop-hugegraph-store.sh +. "$BIN"/start-hugegraph-store.sh diff --git a/hugegraph-store/hg-store-dist/src/assembly/static/bin/start-hugegraph-store.sh b/hugegraph-store/hg-store-dist/src/assembly/static/bin/start-hugegraph-store.sh new file mode 100644 index 0000000000..991d42767a --- /dev/null +++ b/hugegraph-store/hg-store-dist/src/assembly/static/bin/start-hugegraph-store.sh @@ -0,0 +1,211 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +function abs_path() { + SOURCE="${BASH_SOURCE[0]}" + while [ -h "$SOURCE" ]; do + DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" + SOURCE="$(readlink "$SOURCE")" + [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" + done + echo "$( cd -P "$( dirname "$SOURCE" )" && pwd )" +} + +BIN=$(abs_path) +TOP="$(cd "$BIN"/../ && pwd)" +CONF="$TOP/conf" +LIB="$TOP/lib" +PLUGINS="$TOP/plugins" +LOGS="$TOP/logs" +OUTPUT=${LOGS}/hugegraph-store-server.log +GITHUB="https://github.com" +PID_FILE="$BIN/pid" +arch=$(arch) + +echo "Current arch: ", "${arch}" +#if [[ $arch =~ "aarch64" ]];then +# export LD_PRELOAD="$TOP/bin/libjemalloc_aarch64.so" +#else +export LD_PRELOAD="$TOP/bin/libjemalloc.so" +#fi + +##pd/store max user processes, ulimit -u +# Reduce the maximum number of processes that can be opened by a normal dev/user +export PROC_LIMITN=1024 +#export PROC_LIMITN=20480 +##pd/store open files, ulimit -n +export FILE_LIMITN=1024 +#export FILE_LIMITN=1024000 + +function check_evn_limit() { + local limit_check=$(ulimit -n) + if [ ${limit_check} -lt ${FILE_LIMITN} ]; then + echo -e "${BASH_SOURCE[0]##*/}:${LINENO}:\E[1;32m ulimit -n 可以打开的最大文件描述符数太少,需要(${FILE_LIMITN})!! \E[0m" + return 1 + fi + limit_check=$(ulimit -u) + if [ ${limit_check} -lt ${PROC_LIMITN} ]; then + echo -e "${BASH_SOURCE[0]##*/}:${LINENO}:\E[1;32m ulimit -u 用户最大可用的进程数太少,需要(${PROC_LIMITN})!! \E[0m" + return 2 + fi + return 0 +} + +check_evn_limit +if [ $? != 0 ]; then + exit 8 +fi + +if [ -z "$GC_OPTION" ];then + GC_OPTION="" +fi +if [ -z "$USER_OPTION" ];then + USER_OPTION="" +fi +if [ -z "$OPEN_TELEMETRY" ];then + OPEN_TELEMETRY="false" +fi + +while getopts "g:j:y:" arg; do + case ${arg} in + g) GC_OPTION="$OPTARG" ;; + j) USER_OPTION="$OPTARG" ;; + # Telemetry is used to collect metrics, traces and logs + y) OPEN_TELEMETRY="$OPTARG" ;; + ?) echo "USAGE: $0 [-g g1] [-j xxx] [-y true|false]" && exit 1 ;; + esac +done + + + + +. "$BIN"/util.sh + +ensure_path_writable "$LOGS" +ensure_path_writable "$PLUGINS" + +# The maximum and minimum heap memory that service can use (for production env set it 36GB) +MAX_MEM=$((2 * 1024)) +MIN_MEM=$((1 * 512)) +EXPECT_JDK_VERSION=11 + +# Change to $BIN's parent +cd ${TOP} || exit + +# Find Java +if [ "$JAVA_HOME" = "" ]; then + JAVA="java" +else + JAVA="$JAVA_HOME/bin/java" +fi + +# check jdk version +JAVA_VERSION=$($JAVA -version 2>&1 | awk 'NR==1{gsub(/"/,""); print $3}' | awk -F'_' '{print $1}') +if [[ $? -ne 0 || $JAVA_VERSION < $EXPECT_JDK_VERSION ]]; then + echo "Please make sure that the JDK is installed and the version >= $EXPECT_JDK_VERSION" >> ${OUTPUT} + exit 1 +fi + +# Set Java options +if [ "$JAVA_OPTIONS" = "" ]; then + XMX=$(calc_xmx $MIN_MEM $MAX_MEM) + if [ $? -ne 0 ]; then + echo "Failed to start HugeGraphStoreServer, requires at least ${MIN_MEM}m free memory" \ + >> ${OUTPUT} + exit 1 + fi + JAVA_OPTIONS="-Xms${MIN_MEM}m -Xmx${XMX}m -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:+ParallelRefProcEnabled -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${LOGS} ${USER_OPTION} " + # JAVA_OPTIONS="-Xms${MIN_MEM}m -Xmx${XMX}m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${LOGS} ${USER_OPTION}" + + # Rolling out detailed GC logs + JAVA_OPTIONS="${JAVA_OPTIONS} -Xlog:gc=info:file=./logs/gc.log:tags,uptime,level:filecount=3,filesize=100m " +fi + +# Using G1GC as the default garbage collector (Recommended for large memory machines) +case "$GC_OPTION" in + g1) + echo "Using G1GC as the default garbage collector" + JAVA_OPTIONS="${JAVA_OPTIONS} -XX:+UseG1GC -XX:+ParallelRefProcEnabled \ + -XX:InitiatingHeapOccupancyPercent=50 -XX:G1RSetUpdatingPauseTimePercent=5" + ;; + "") ;; + *) + echo "Unrecognized gc option: '$GC_OPTION', only support 'g1' now" >> ${OUTPUT} + exit 1 +esac + +JVM_OPTIONS="-Dlog4j.configurationFile=${CONF}/log4j2.xml -Dfastjson.parser.safeMode=true" + +if [ "${OPEN_TELEMETRY}" == "true" ]; then + OT_JAR="opentelemetry-javaagent.jar" + OT_JAR_PATH="${PLUGINS}/${OT_JAR}" + + if [[ ! -e "${OT_JAR_PATH}" ]]; then + echo "## Downloading ${OT_JAR}..." + download "${PLUGINS}" \ + "${GITHUB}/open-telemetry/opentelemetry-java-instrumentation/releases/download/v2.1.0/${OT_JAR}" + + if [[ ! -e "${OT_JAR_PATH}" ]]; then + echo "## Error: Failed to download ${OT_JAR}." >>${OUTPUT} + exit 1 + fi + fi + + # Note: remember update it if we change the jar + expected_md5="e3bcbbe8ed9b6d840fa4c333b36f369f" + actual_md5=$(md5sum "${OT_JAR_PATH}" | awk '{print $1}') + + if [[ "${expected_md5}" != "${actual_md5}" ]]; then + echo "## Error: MD5 checksum verification failed for ${OT_JAR_PATH}." >>${OUTPUT} + echo "## Tips: Remove the file and try again." >>${OUTPUT} + exit 1 + fi + + # Note: check carefully if multi "javeagent" params are set + export JAVA_TOOL_OPTIONS="-javaagent:${PLUGINS}/${OT_JAR}" + export OTEL_TRACES_EXPORTER=otlp + export OTEL_METRICS_EXPORTER=none + export OTEL_LOGS_EXPORTER=none + export OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=grpc + # 127.0.0.1:4317 is the port of otel-collector running in Docker located in + # 'hugegraph-server/hugegraph-dist/docker/example/docker-compose-trace.yaml'. + # Make sure the otel-collector is running before starting HugeGraphStore. + export OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://127.0.0.1:4317 + export OTEL_RESOURCE_ATTRIBUTES=service.name=store +fi + +#if [ "${JMX_EXPORT_PORT}" != "" ] && [ ${JMX_EXPORT_PORT} -ne 0 ] ; then +# JAVA_OPTIONS="${JAVA_OPTIONS} -javaagent:${LIB}/jmx_prometheus_javaagent-0.16.1.jar=${JMX_EXPORT_PORT}:${CONF}/jmx_exporter.yml" +#fi + +if [ $(ps -ef|grep -v grep| grep java|grep -cE ${CONF}) -ne 0 ]; then + echo "HugeGraphStoreServer is already running..." + exit 0 +fi + +echo "Starting HG-StoreServer..." + +exec ${JAVA} -Dname="HugeGraphStore" ${JVM_OPTIONS} ${JAVA_OPTIONS} -jar \ + -Dspring.config.location=${CONF}/application.yml \ + ${LIB}/hg-store-node-*.jar >> ${OUTPUT} 2>&1 & + +PID="$!" +# Write pid to file +echo "$PID" > "$PID_FILE" +echo "[+pid] $PID" diff --git a/hugegraph-store/hg-store-dist/src/assembly/static/bin/stop-hugegraph-store.sh b/hugegraph-store/hg-store-dist/src/assembly/static/bin/stop-hugegraph-store.sh new file mode 100644 index 0000000000..8f898df7da --- /dev/null +++ b/hugegraph-store/hg-store-dist/src/assembly/static/bin/stop-hugegraph-store.sh @@ -0,0 +1,48 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +abs_path() { + SOURCE="${BASH_SOURCE[0]}" + while [ -h "$SOURCE" ]; do + DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" + SOURCE="$(readlink "$SOURCE")" + [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" + done + echo "$( cd -P "$( dirname "$SOURCE" )" && pwd )" +} + +BIN=$(abs_path) +TOP="$(cd $BIN/../ && pwd)" + +. "$BIN"/util.sh + +PID_FILE=$BIN/pid +SERVER_SHUTDOWN_TIMEOUT_S=30 + +if [ ! -f ${PID_FILE} ]; then + echo "The pid file $PID_FILE doesn't exist" + exit 0 +fi + +PID=`cat $PID_FILE` +kill_process_and_wait "HugeGraphStoreServer" "$PID" "$SERVER_SHUTDOWN_TIMEOUT_S" + +if [ $? -eq 0 ]; then + rm "$PID_FILE" +fi diff --git a/hugegraph-store/hg-store-dist/src/assembly/static/bin/util.sh b/hugegraph-store/hg-store-dist/src/assembly/static/bin/util.sh new file mode 100644 index 0000000000..93eae7b890 --- /dev/null +++ b/hugegraph-store/hg-store-dist/src/assembly/static/bin/util.sh @@ -0,0 +1,372 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +function command_available() { + local cmd=$1 + if [ `command -v $cmd >/dev/null 2>&1` ]; then + return 1 + else + return 0 + fi +} + +# read a property from .properties file +function read_property() { + # file path + file_name=$1 + # replace "." to "\." + property_name=`echo $2 | sed 's/\./\\\./g'` + cat $file_name | sed -n -e "s/^[ ]*//g;/^#/d;s/^$property_name=//p" | tail -1 +} + +function write_property() { + local file=$1 + local key=$2 + local value=$3 + + local os=`uname` + case $os in + # Note: in mac os should use sed -i '' "xxx" to replace string, + # otherwise prompt 'command c expects \ followed by text'. + # See http://www.cnblogs.com/greedy-day/p/5952899.html + Darwin) sed -i '' "s!$key=.*!$key=$value!g" "$file" ;; + *) sed -i "s!$key=.*!$key=$value!g" "$file" ;; + esac +} + +function parse_yaml() { + local file=$1 + local version=$2 + local module=$3 + + cat $file | tr -d '\n {}'| awk -F',+|:' '''{ + pre=""; + for(i=1; i<=NF; ) { + if(match($i, /version/)) { + pre=$i; + i+=1 + } else { + result[pre"-"$i] = $(i+1); + i+=2 + } + } + } END {for(e in result) {print e": "result[e]}}''' \ + | grep "$version-$module" | awk -F':' '{print $2}' | tr -d ' ' && echo +} + +function process_num() { + num=`ps -ef | grep $1 | grep -v grep | wc -l` + return $num +} + +function process_id() { + pid=`ps -ef | grep $1 | grep -v grep | awk '{print $2}'` + return $pid +} + +# check the port of rest server is occupied +function check_port() { + local port=`echo $1 | awk -F':' '{print $3}'` + if ! command_available "lsof"; then + echo "Required lsof but it is unavailable" + exit 1 + fi + lsof -i :$port >/dev/null + if [ $? -eq 0 ]; then + echo "The port $port has already been used" + exit 1 + fi +} + +function crontab_append() { + local job="$1" + crontab -l | grep -F "$job" >/dev/null 2>&1 + if [ $? -eq 0 ]; then + return 1 + fi + (crontab -l ; echo "$job") | crontab - +} + +function crontab_remove() { + local job="$1" + # check exist before remove + crontab -l | grep -F "$job" >/dev/null 2>&1 + if [ $? -eq 1 ]; then + return 0 + fi + + crontab -l | grep -Fv "$job" | crontab - + + # Check exist after remove + crontab -l | grep -F "$job" >/dev/null 2>&1 + if [ $? -eq 0 ]; then + return 1 + else + return 0 + fi +} + +# wait_for_startup friendly_name host port timeout_s +function wait_for_startup() { + local pid="$1" + local server_name="$2" + local server_url="$3" + local timeout_s="$4" + + local now_s=`date '+%s'` + local stop_s=$(( $now_s + $timeout_s )) + + local status + + echo -n "Connecting to $server_name ($server_url)" + while [ $now_s -le $stop_s ]; do + echo -n . + process_status "$server_name" "$pid" >/dev/null + if [ $? -eq 1 ]; then + echo "Starting $server_name failed" + return 1 + fi + + status=`curl -o /dev/null -s -k -w %{http_code} $server_url` + if [[ $status -eq 200 || $status -eq 401 ]]; then + echo "OK" + echo "Started [pid $pid]" + return 0 + fi + sleep 2 + now_s=`date '+%s'` + done + + echo "The operation timed out when attempting to connect to $server_url" >&2 + return 1 +} + +function free_memory() { + local free="" + local os=`uname` + if [ "$os" == "Linux" ]; then + local mem_free=`cat /proc/meminfo | grep -w "MemFree" | awk '{print $2}'` + local mem_buffer=`cat /proc/meminfo | grep -w "Buffers" | awk '{print $2}'` + local mem_cached=`cat /proc/meminfo | grep -w "Cached" | awk '{print $2}'` + if [[ "$mem_free" == "" || "$mem_buffer" == "" || "$mem_cached" == "" ]]; then + echo "Failed to get free memory" + exit 1 + fi + free=`expr $mem_free + $mem_buffer + $mem_cached` + free=`expr $free / 1024` + elif [ "$os" == "Darwin" ]; then + local pages_free=`vm_stat | awk '/Pages free/{print $0}' | awk -F'[:.]+' '{print $2}' | tr -d " "` + local pages_inactive=`vm_stat | awk '/Pages inactive/{print $0}' | awk -F'[:.]+' '{print $2}' | tr -d " "` + local pages_available=`expr $pages_free + $pages_inactive` + free=`expr $pages_available \* 4096 / 1024 / 1024` + else + echo "Unsupported operating system $os" + exit 1 + fi + echo $free +} + +function calc_xmx() { + local min_mem=$1 + local max_mem=$2 + # Get machine available memory + local free=`free_memory` + local half_free=$[free/2] + + local xmx=$min_mem + if [[ "$free" -lt "$min_mem" ]]; then + exit 1 + elif [[ "$half_free" -ge "$max_mem" ]]; then + xmx=$max_mem + elif [[ "$half_free" -lt "$min_mem" ]]; then + xmx=$min_mem + else + xmx=$half_free + fi + echo $xmx +} + +function remove_with_prompt() { + local path=$1 + local tips="" + + if [ -d "$path" ]; then + tips="Remove directory '$path' and all sub files [y/n]?" + elif [ -f "$path" ]; then + tips="Remove file '$path' [y/n]?" + else + return 0 + fi + + read -p "$tips " yn + case $yn in + [Yy]* ) rm -rf "$path";; + * ) ;; + esac +} + +function ensure_path_writable() { + local path=$1 + # Ensure input path exist + if [ ! -d "${path}" ]; then + mkdir -p ${path} + fi + # Check for write permission + if [ ! -w "${path}" ]; then + echo "No write permission on directory ${path}" + exit 1 + fi +} + +function get_ip() { + local os=`uname` + local loopback="127.0.0.1" + local ip="" + case $os in + Linux) + if command_available "ifconfig"; then + ip=`ifconfig | grep 'inet addr:' | grep -v "$loopback" | cut -d: -f2 | awk '{ print $1}'` + elif command_available "ip"; then + ip=`ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | awk -F"/" '{print $1}'` + else + ip=$loopback + fi + ;; + FreeBSD|OpenBSD|Darwin) + if command_available "ifconfig"; then + ip=`ifconfig | grep -E 'inet.[0-9]' | grep -v "$loopback" | awk '{ print $2}'` + else + ip=$loopback + fi + ;; + SunOS) + if command_available "ifconfig"; then + ip=`ifconfig -a | grep inet | grep -v "$loopback" | awk '{ print $2} '` + else + ip=$loopback + fi + ;; + *) ip=$loopback;; + esac + echo $ip +} + +function download() { + local path=$1 + local link_url=$2 + + if command_available "wget"; then + wget --help | grep -q '\--show-progress' && progress_opt="-q --show-progress" || progress_opt="" + wget ${link_url} -P ${path} $progress_opt + elif command_available "curl"; then + curl ${link_url} -o ${path}/${link_url} + else + echo "Required wget or curl but they are unavailable" + exit 1 + fi +} + +function ensure_package_exist() { + local path=$1 + local dir=$2 + local tar=$3 + local link=$4 + + if [ ! -d ${path}/${dir} ]; then + if [ ! -f ${path}/${tar} ]; then + echo "Downloading the compressed package '${tar}'" + download ${path} ${link} + if [ $? -ne 0 ]; then + echo "Failed to download, please ensure the network is available and link is valid" + exit 1 + fi + echo "[OK] Finished download" + fi + echo "Unzip the compressed package '$tar'" + tar -zxvf ${path}/${tar} -C ${path} >/dev/null 2>&1 + if [ $? -ne 0 ]; then + echo "Failed to unzip, please check the compressed package" + exit 1 + fi + echo "[OK] Finished unzip" + fi +} + +########################################################################### + +function wait_for_shutdown() { + local process_name="$1" + local pid="$2" + local timeout_s="$3" + + local now_s=`date '+%s'` + local stop_s=$(( $now_s + $timeout_s )) + + echo -n "Killing $process_name(pid $pid)" >&2 + while [ $now_s -le $stop_s ]; do + echo -n . + process_status "$process_name" "$pid" >/dev/null + if [ $? -eq 1 ]; then + echo "OK" + return 0 + fi + sleep 2 + now_s=`date '+%s'` + done + echo "$process_name shutdown timeout(exceeded $timeout_s seconds)" >&2 + return 1 +} + +function process_status() { + local process_name="$1" + local pid="$2" + + ps -p "$pid" + if [ $? -eq 0 ]; then + echo "$process_name is running with pid $pid" + return 0 + else + echo "The process $process_name does not exist" + return 1 + fi +} + +function kill_process() { + local process_name="$1" + local pid="$2" + + if [ -z "$pid" ]; then + echo "The process $pid does not exist" + return 0 + fi + + case "`uname`" in + CYGWIN*) taskkill /F /PID "$pid" ;; + *) kill "$pid" ;; + esac +} + +function kill_process_and_wait() { + local process_name="$1" + local pid="$2" + local timeout_s="$3" + + kill_process "$process_name" "$pid" + wait_for_shutdown "$process_name" "$pid" "$timeout_s" +} diff --git a/hugegraph-store/hg-store-dist/src/assembly/static/conf/application-pd.yml b/hugegraph-store/hg-store-dist/src/assembly/static/conf/application-pd.yml new file mode 100644 index 0000000000..df535953fc --- /dev/null +++ b/hugegraph-store/hg-store-dist/src/assembly/static/conf/application-pd.yml @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +management: + metrics: + export: + prometheus: + enabled: true + endpoints: + web: + exposure: + include: "*" + +rocksdb: + # rocksdb 使用的总内存大小,达到该值强制写盘 + total_memory_size: 32000000000 + # rocksdb 使用的 memtable 大小 + write_buffer_size: 32000000 + # 对于每个 rocksdb 来说,memtable 个数达到该值进行写盘 + min_write_buffer_number_to_merge: 16 diff --git a/hugegraph-store/hg-store-dist/src/assembly/static/conf/application.yml b/hugegraph-store/hg-store-dist/src/assembly/static/conf/application.yml new file mode 100644 index 0000000000..4ca3d34dd6 --- /dev/null +++ b/hugegraph-store/hg-store-dist/src/assembly/static/conf/application.yml @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pdserver: + # pd 服务地址,多个 pd 地址用逗号分割 + address: localhost:8686 + +management: + metrics: + export: + prometheus: + enabled: true + endpoints: + web: + exposure: + include: "*" + +grpc: + # grpc 的服务地址 + host: 127.0.0.1 + port: 8500 + netty-server: + max-inbound-message-size: 1000MB +raft: + # raft 缓存队列大小 + disruptorBufferSize: 1024 + address: 127.0.0.1:8510 + max-log-file-size: 600000000000 + # 快照生成时间间隔,单位秒 + snapshotInterval: 1800 +server: + # rest 服务地址 + port: 8520 + +app: + # 存储路径,支持多个路径,逗号分割 + data-path: ./storage + #raft-path: ./storage + +spring: + application: + name: store-node-grpc-server + profiles: + active: default + include: pd + +logging: + config: 'file:./conf/log4j2.xml' + level: + root: info diff --git a/hugegraph-store/hg-store-dist/src/assembly/static/conf/log4j2.xml b/hugegraph-store/hg-store-dist/src/assembly/static/conf/log4j2.xml new file mode 100644 index 0000000000..388d09e2fd --- /dev/null +++ b/hugegraph-store/hg-store-dist/src/assembly/static/conf/log4j2.xml @@ -0,0 +1,137 @@ + + + + + + + + logs + hugegraph-store + raft-hugegraph-store + audit-hugegraph-store + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/hugegraph-store/pom.xml b/hugegraph-store/pom.xml index ea0d3dc8e2..2965bc855a 100644 --- a/hugegraph-store/pom.xml +++ b/hugegraph-store/pom.xml @@ -40,10 +40,9 @@ hg-store-test hg-store-rocksdb hg-store-core - hg-store-node - - + hg-store-dist + hg-store-cli @@ -154,7 +153,7 @@ *.tar *.tar.gz .flattened-pom.xml - + dist/** false diff --git a/install-dist/scripts/dependency/known-dependencies.txt b/install-dist/scripts/dependency/known-dependencies.txt index 26a1dd78b2..5fac276f03 100644 --- a/install-dist/scripts/dependency/known-dependencies.txt +++ b/install-dist/scripts/dependency/known-dependencies.txt @@ -143,6 +143,7 @@ hg-store-common-1.3.0.jar hg-store-core-1.3.0.jar hg-store-grpc-1.3.0.jar hg-store-rocksdb-1.3.0.jar +hg-store-node-1.3.0.jar high-scale-lib-1.0.6.jar hk2-api-3.0.1.jar hk2-locator-3.0.1.jar