diff --git a/.github/configs/settings.xml b/.github/configs/settings.xml new file mode 100644 index 000000000..294ded1cb --- /dev/null +++ b/.github/configs/settings.xml @@ -0,0 +1,60 @@ + + + + + + github + ${env.GITHUB_ACTOR} + ${env.GITHUB_TOKEN} + + + + + + local-repo + + + central + https://repo.maven.apache.org/maven2 + + true + + + false + + + + staged-releases + https://repository.apache.org/content/groups/staging/ + + + + + staged-releases + https://repository.apache.org/content/groups/staging/ + + + + + + + local-repo + + diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e64ec7d7f..5cfd38a12 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,14 +11,16 @@ jobs: computer-ci: runs-on: ubuntu-latest env: + USE_STAGE: 'true' # Whether to include the stage repository. TRAVIS_DIR: computer-dist/src/assembly/travis KUBERNETES_VERSION: 1.20.1 - HUGEGRAPH_SERVER_COMMIT_ID: d01c8737d7d5909119671953521f1401dcd1a188 BSP_ETCD_URL: http://localhost:2579 + # TODO: delete this env in the future (replaced by docker way now) + HUGEGRAPH_SERVER_COMMIT_ID: d01c8737d7d5909119671953521f1401dcd1a188 steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 2 @@ -57,12 +59,6 @@ jobs: - name: Setup Minikube-Kubernetes run: $TRAVIS_DIR/install-k8s.sh - - name: Check Component - run: | - sleep 5 - curl localhost:9000 - kubectl get nodes - - name: Cache Maven packages uses: actions/cache@v3 with: @@ -70,11 +66,16 @@ jobs: key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} restore-keys: ${{ runner.os }}-m2 + - name: Check Component + run: | + curl localhost:9000 + kubectl get nodes + - name: Prepare env and service run: | $TRAVIS_DIR/install-env.sh - $TRAVIS_DIR/install-hugegraph-from-source.sh $HUGEGRAPH_SERVER_COMMIT_ID $TRAVIS_DIR/load-data-into-hugegraph.sh + #$TRAVIS_DIR/install-hugegraph-from-source.sh $HUGEGRAPH_SERVER_COMMIT_ID - name: Install JDK 11 uses: actions/setup-java@v3 @@ -82,8 +83,14 @@ jobs: java-version: '11' distribution: 'zulu' + - name: Use staged maven repo + if: ${{ env.USE_STAGE == 'true' }} + run: | + cp $HOME/.m2/settings.xml /tmp/settings.xml + mv -vf .github/configs/settings.xml $HOME/.m2/settings.xml + - name: Compile - run: mvn clean compile -Dmaven.javadoc.skip=true -ntp + run: mvn clean compile -e -Dmaven.javadoc.skip=true -ntp - name: Integrate test run: mvn test -P integrate-test -ntp @@ -92,6 +99,7 @@ jobs: run: mvn test -P unit-test -ntp - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3.0.0 + uses: codecov/codecov-action@v3 with: + token: ${{ secrets.CODECOV_TOKEN }} file: target/site/jacoco/jacoco.xml diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 7bc30f627..32bebd0a5 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -4,13 +4,15 @@ name: "CodeQL" on: pull_request: - # The branches below must be a subset of the branches above, now enable it in all PR - # branches: [ master ] + # The branches below must be a subset of the branches above, now enable it in all PR + # branches: [ master ] schedule: - cron: '45 7 * * 1' jobs: analyze: + env: + USE_STAGE: 'true' # Whether to include the stage repository. name: Analyze runs-on: ubuntu-latest permissions: @@ -24,43 +26,55 @@ jobs: language: [ 'go', 'java' ] steps: - - name: Checkout repository - uses: actions/checkout@v3 + - name: Checkout repository + uses: actions/checkout@v4 - # Initializes the CodeQL tools for scanning. - - name: Initialize CodeQL - uses: github/codeql-action/init@v2 - with: - languages: ${{ matrix.language }} - # If you wish to specify custom queries, you can do so here or in a config file. - # By default, queries listed here will override any specified in a config file. - # Prefix the list here with "+" to use these queries and those in the config file. - # queries: ./path/to/local/query, your-org/your-repo/queries@main + - name: Setup Java JDK + uses: actions/setup-java@v3 + with: + distribution: 'zulu' + java-version: '11' - # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). - # If this step fails, then you should remove it and run the build manually (see below) - - name: Autobuild - uses: github/codeql-action/autobuild@v2 + - name: use staged maven repo settings + if: ${{ env.USE_STAGE == 'true' }} + run: | + cp $HOME/.m2/settings.xml /tmp/settings.xml + mv -vf .github/configs/settings.xml $HOME/.m2/settings.xml - # ℹ️ Command-line programs to run using the OS shell. - # 📚 https://git.io/JvXDl + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main - # ✏️ If the Autobuild fails above, remove it and uncomment the following three lines - # and modify them (or add more) to build your code if your project - # uses a compiled language + # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v2 - #- run: | - # make bootstrap - # make release + # ℹ️ Command-line programs to run using the OS shell. + # 📚 https://git.io/JvXDl - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v2 + # ✏️ If the Autobuild fails above, remove it and uncomment the following three lines + # and modify them (or add more) to build your code if your project + # uses a compiled language + + #- run: | + # make bootstrap + # make release + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 dependency-review: runs-on: ubuntu-latest steps: - name: 'Checkout Repository' - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: 'Dependency Review' uses: actions/dependency-review-action@v3 diff --git a/.github/workflows/license-checker.yml b/.github/workflows/license-checker.yml index ea2c3687b..caa3f85f8 100644 --- a/.github/workflows/license-checker.yml +++ b/.github/workflows/license-checker.yml @@ -15,7 +15,7 @@ jobs: check-license-header: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 # More info could refer to: https://github.com/apache/skywalking-eyes - name: Check License Header uses: apache/skywalking-eyes@main @@ -36,25 +36,29 @@ jobs: find ./ -name rat.txt -print0 | xargs -0 -I file cat file > merged-rat.txt grep "Binaries" merged-rat.txt -C 3 && cat merged-rat.txt -# TODO: enable it later -# check-dependency-license: -# runs-on: ubuntu-latest -# env: -# SCRIPT_DEPENDENCY: computer-dist/scripts/dependency -# steps: -# - name: Checkout source -# uses: actions/checkout@v3 -# - name: Set up JDK 11 -# uses: actions/setup-java@v3 -# with: -# java-version: '11' -# distribution: 'adopt' -# - name: mvn install -# run: | -# mvn install -DskipTests=true -ntp -# - name: generate current dependencies -# run: | -# bash $SCRIPT_DEPENDENCY/regenerate_known_dependencies.sh current-dependencies.txt -# - name: check third dependencies -# run: | -# bash $SCRIPT_DEPENDENCY/check_dependencies.sh + check-dependency-license: + runs-on: ubuntu-latest + env: + SCRIPT_DEPENDENCY: computer-dist/scripts/dependency + USE_STAGE: 'true' # Whether to include the stage repository. + steps: + - name: Checkout source + uses: actions/checkout@v4 + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + java-version: '11' + distribution: 'adopt' + - name: Use staged maven repo settings + if: ${{ env.USE_STAGE == 'true' }} + run: | + cp $HOME/.m2/settings.xml /tmp/settings.xml + mv -vf .github/configs/settings.xml $HOME/.m2/settings.xml + - name: Compile install + run: mvn package -DskipTests=true -ntp + + # TODO: enable it after the check scripts are ready, lack them now + #- name: Generate & check current 3rd-party dependencies + # run: | + # bash $SCRIPT_DEPENDENCY/regenerate_known_dependencies.sh current-dependencies.txt + # bash $SCRIPT_DEPENDENCY/check_dependencies.sh diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml index 3ba4f72bf..5b5cdc21e 100644 --- a/.github/workflows/stale.yml +++ b/.github/workflows/stale.yml @@ -13,7 +13,7 @@ jobs: pull-requests: write steps: - - uses: actions/stale@v3 + - uses: actions/stale@v8 with: repo-token: ${{ secrets.GITHUB_TOKEN }} stale-issue-message: 'Due to the lack of activity, the current issue is marked as stale and will be closed after 20 days, any update will remove the stale label' diff --git a/checkstyle.xml b/checkstyle.xml index 96156a36d..ef00578cb 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -35,7 +35,8 @@ - + + @@ -84,7 +85,8 @@ - + diff --git a/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java b/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java index 73658f0ab..846ccaaca 100644 --- a/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java +++ b/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java @@ -59,7 +59,7 @@ public final class Constants { public static final int FUTURE_TIMEOUT = 300; /* - * The timeout in millisecond for threadpool shutdown + * The timeout in millisecond for thread-pool shutdown */ public static final long SHUTDOWN_TIMEOUT = 5000L; diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java index c4681be1c..ce97cb48c 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java @@ -65,8 +65,7 @@ public EtcdClient(String endpoints, String namespace) { "The endpoints can't be null"); E.checkArgumentNotNull(namespace, "The namespace can't be null"); - ByteSequence namespaceSeq = ByteSequence.from(namespace.getBytes( - ENCODING)); + ByteSequence namespaceSeq = ByteSequence.from(namespace.getBytes(ENCODING)); this.client = Client.builder().endpoints(endpoints) .namespace(namespaceSeq).build(); this.watch = this.client.getWatchClient(); @@ -80,20 +79,14 @@ public EtcdClient(String endpoints, String namespace) { * @param value value to be associated with the specified key */ public void put(String key, byte[] value) { - E.checkArgument(key != null, - "The key can't be null."); - E.checkArgument(value != null, - "The value can't be null."); + E.checkArgument(key != null, "The key can't be null."); + E.checkArgument(value != null, "The value can't be null."); try { - this.kv.put(ByteSequence.from(key, ENCODING), - ByteSequence.from(value)) - .get(); + this.kv.put(ByteSequence.from(key, ENCODING), ByteSequence.from(value)).get(); } catch (InterruptedException e) { - throw new ComputerException( - "Interrupted while putting with key='%s'", e, key); + throw new ComputerException("Interrupted while putting with key='%s'", e, key); } catch (ExecutionException e) { - throw new ComputerException("Error while putting with key='%s'", - e, key); + throw new ComputerException("Error while putting with key='%s'", e, key); } } @@ -110,8 +103,8 @@ public byte[] get(String key) { * Returns the value to which the specified key is mapped. * @param key The key to be found * @param throwException whether to throw ComputerException if not found. - * @return the value of specified key, null if not found and - * throwException is set false + * @return the value of the specified key, null if not found, + * and throwException is set false * @throws ComputerException if not found and throwException is set true */ public byte[] get(String key, boolean throwException) { @@ -124,24 +117,20 @@ public byte[] get(String key, boolean throwException) { assert kvs.size() == 1; return kvs.get(0).getValue().getBytes(); } else if (throwException) { - throw new ComputerException("Can't find value for key='%s'", - key); + throw new ComputerException("Can't find value for key='%s'", key); } else { return null; } } catch (InterruptedException e) { - throw new ComputerException( - "Interrupted while getting with key='%s'", e, key); + throw new ComputerException("Interrupted while getting with key='%s'", e, key); } catch (ExecutionException e) { - throw new ComputerException("Error while getting with key='%s'", - e, key); + throw new ComputerException("Error while getting with key='%s'", e, key); } } /** * Returns the value to which the specified key is mapped. If no - * key exists, wait at most timeout milliseconds. Or throw - * ComputerException if timeout + * key exists, wait for most time out milliseconds. Or throw ComputerException if timeout * @param key the key whose associated value is to be returned. * @param timeout the max time in milliseconds to wait. * @return the specified value in byte array to which the specified key is @@ -149,11 +138,9 @@ public byte[] get(String key, boolean throwException) { */ public byte[] get(String key, long timeout, long logInterval) { E.checkArgumentNotNull(key, "The key can't be null"); - E.checkArgument(timeout > 0L, - "The timeout must be > 0, but got: %s", timeout); - E.checkArgument(logInterval > 0L, - "The logInterval must be > 0, but got: %s", - logInterval); + E.checkArgument(timeout > 0L, "The timeout must be > 0, but got: %s", timeout); + E.checkArgument(logInterval > 0L, "The logInterval must be > 0, but got: %s", logInterval); + ByteSequence keySeq = ByteSequence.from(key, ENCODING); try { GetResponse response = this.kv.get(keySeq).get(); @@ -162,16 +149,12 @@ public byte[] get(String key, long timeout, long logInterval) { return kvs.get(0).getValue().getBytes(); } else { long revision = response.getHeader().getRevision(); - return this.waitAndGetFromPutEvent(keySeq, revision, - timeout, logInterval); + return this.waitAndGetFromPutEvent(keySeq, revision, timeout, logInterval); } } catch (InterruptedException e) { - throw new ComputerException( - "Interrupted while getting with key='%s'", - e, key); + throw new ComputerException("Interrupted while getting with key='%s'", e, key); } catch (ExecutionException e) { - throw new ComputerException("Error while getting with key='%s'", - e, key); + throw new ComputerException("Error while getting with key='%s'", e, key); } } @@ -214,8 +197,7 @@ private byte[] waitAndGetFromPutEvent(ByteSequence keySeq, long revision, .withRevision(revision) .withNoDelete(true) .build(); - try (Watch.Watcher watcher = this.watch.watch(keySeq, watchOption, - consumer)) { + try (Watch.Watcher ignored = this.watch.watch(keySeq, watchOption, consumer)) { return barrierEvent.await(timeout, logInterval, () -> { LOG.info("Wait for key '{}' with timeout {}ms", keySeq.toString(ENCODING), timeout); @@ -225,7 +207,7 @@ private byte[] waitAndGetFromPutEvent(ByteSequence keySeq, long revision, /** * Get the values of keys with the specified prefix. - * If no key found, return empty list. + * If no key is found, return an empty list. */ public List getWithPrefix(String prefix) { E.checkArgumentNotNull(prefix, "The prefix can't be null"); @@ -251,7 +233,7 @@ public List getWithPrefix(String prefix) { /** * Get the expected count of values of keys with the specified prefix. - * Throws ComputerException if there are no enough object. + * Throws ComputerException if there are no enough objects. */ public List getWithPrefix(String prefix, int count) { E.checkArgumentNotNull(prefix, @@ -284,12 +266,12 @@ public List getWithPrefix(String prefix, int count) { } /** - * Get expected count of values with the key prefix with prefix. If there - * is no count of keys, wait at most timeout milliseconds. + * Get the expected count of values with the key prefix with prefix. + * If there is no count of keys, wait at max timeout milliseconds. * @param prefix the key prefix - * @param count the expected count of values to be get + * @param count the expected count of values to be got * @param timeout the max wait time - * @param logInterval the interval in ms to log message + * @param logInterval the interval in ms to log a message * @return the list of values which key with specified prefix */ public List getWithPrefix(String prefix, int count, @@ -329,8 +311,8 @@ public List getWithPrefix(String prefix, int count, /** * Wait at most expected eventCount events triggered in timeout ms. - * This method wait at most timeout ms regardless whether expected - * eventCount events triggered. + * This method waits at most timeout ms regardless of whether expected-eventCount events + * triggered. * @param existedKeyValues readonly */ private List waitAndPrefixGetFromPutEvent( @@ -368,21 +350,18 @@ private List waitAndPrefixGetFromPutEvent( .withPrefix(prefixSeq) .withRevision(revision) .build(); - try (Watch.Watcher watcher = this.watch.watch(prefixSeq, - watchOption, - consumer)) { + try (Watch.Watcher ignored = this.watch.watch(prefixSeq, watchOption, consumer)) { return barrierEvent.await(timeout, logInterval, () -> { LOG.info("Wait for keys with prefix '{}' and timeout {}ms, " + "expect {} keys but actual got {} keys", - prefixSeq.toString(ENCODING), - timeout, count, keyValues.size()); + prefixSeq.toString(ENCODING), timeout, count, keyValues.size()); }); } } /** * @return 1 if deleted specified key, 0 if not found specified key - * The deleted data can be get through revision, if revision is compacted, + * The deleted data can be got through revision, if revision is compacted, * throw exception "etcdserver: mvcc: required revision has been compacted". * @see * Maintenance @@ -391,12 +370,10 @@ public long delete(String key) { E.checkArgumentNotNull(key, "The key can't be null"); ByteSequence keySeq = ByteSequence.from(key, ENCODING); try { - DeleteResponse response = this.client.getKVClient().delete(keySeq) - .get(); + DeleteResponse response = this.client.getKVClient().delete(keySeq).get(); return response.getDeleted(); } catch (InterruptedException e) { - throw new ComputerException("Interrupted while deleting '%s'", - e, key); + throw new ComputerException("Interrupted while deleting '%s'", e, key); } catch (ExecutionException e) { throw new ComputerException("Error while deleting '%s'", e, key); } @@ -408,12 +385,10 @@ public long delete(String key) { public long deleteWithPrefix(String prefix) { E.checkArgumentNotNull(prefix, "The prefix can't be null"); ByteSequence prefixSeq = ByteSequence.from(prefix, ENCODING); - DeleteOption deleteOption = DeleteOption.newBuilder() - .withPrefix(prefixSeq).build(); + DeleteOption deleteOption = DeleteOption.newBuilder().withPrefix(prefixSeq).build(); try { DeleteResponse response = this.client.getKVClient() - .delete(prefixSeq, - deleteOption) + .delete(prefixSeq, deleteOption) .get(); return response.getDeleted(); } catch (InterruptedException e) { diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java index 06fb7564c..da01fa7b2 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java @@ -66,11 +66,10 @@ public class MasterService implements Closeable { private Config config; private volatile Bsp4Master bsp4Master; private ContainerInfo masterInfo; - private List workers; private int maxSuperStep; private MasterComputation masterComputation; - private volatile ShutdownHook shutdownHook; + private final ShutdownHook shutdownHook; private volatile Thread serviceThread; public MasterService() { @@ -101,7 +100,7 @@ public void init(Config config) { rpcAddress.getPort()); /* * Connect to BSP server and clean the old data may be left by the - * previous job with same job id. + * previous job with the same job id. */ this.bsp4Master = new Bsp4Master(this.config); this.bsp4Master.clean(); @@ -114,9 +113,9 @@ public void init(Config config) { LOG.info("{} register MasterService", this); this.bsp4Master.masterInitDone(this.masterInfo); - this.workers = this.bsp4Master.waitWorkersInitDone(); + List workers = this.bsp4Master.waitWorkersInitDone(); LOG.info("{} waited all workers registered, workers count: {}", - this, this.workers.size()); + this, workers.size()); LOG.info("{} MasterService initialized", this); this.inited = true; @@ -141,24 +140,36 @@ private void registerShutdownHook() { } /** - * Stop the the master service. Stop the managers created in - * {@link #init(Config)}. + * Stop the master service. Stop the managers created in {@link #init(Config)}. */ @Override public synchronized void close() { - this.checkInited(); + // TODO: check the logic of close carefully later + //this.checkInited(); if (this.closed) { LOG.info("{} MasterService had closed before", this); return; } - this.masterComputation.close(new DefaultMasterContext()); + try { + if (this.masterComputation != null) { + this.masterComputation.close(new DefaultMasterContext()); + } + } catch (Exception e) { + LOG.error("Error occurred while closing master service", e); + } - if (!failed) { + if (!failed && this.bsp4Master != null) { this.bsp4Master.waitWorkersCloseDone(); } - this.managers.closeAll(this.config); + try { + if (managers != null) { + this.managers.closeAll(this.config); + } + } catch (Exception e) { + LOG.error("Error occurred while closing managers", e); + } this.cleanAndCloseBsp(); this.shutdownHook.unhook(); @@ -333,7 +344,7 @@ private int superstepToResume() { * 1): Has run maxSuperStep times of superstep iteration. * 2): The mater-computation returns false that stop superstep iteration. * 3): All vertices are inactive and no message sent in a superstep. - * @param masterContinue The master-computation decide + * @param masterContinue The master-computation decides * @return true if finish superstep iteration. */ private boolean finishedIteration(boolean masterContinue, @@ -351,7 +362,7 @@ private boolean finishedIteration(boolean masterContinue, /** * Coordinate with workers to load vertices and edges from HugeGraph. There - * are two phases in inputstep. First phase is get input splits from + * are two phases in inputstep. The First phase is to get input splits from * master, and read the vertices and edges from input splits. The second * phase is after all workers read input splits, the workers merge the * vertices and edges to get the stats for each partition. @@ -371,8 +382,7 @@ private SuperstepStat inputstep() { } /** - * Wait the workers write result back. After this, the job is finished - * successfully. + * Wait the workers write a result back. After this, the job is finished successfully. */ private void outputstep() { LOG.info("{} MasterService outputstep started", this); @@ -400,7 +410,7 @@ void registerAggregator(String name, Class aggregatorClass) { "The aggregator class can't be null"); Aggregator aggr; try { - aggr = aggregatorClass.newInstance(); + aggr = aggregatorClass.getDeclaredConstructor().newInstance(); } catch (Exception e) { throw new ComputerException("Can't new instance from class: %s", e, aggregatorClass.getName()); diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java index 6f904d88f..44732ad40 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java @@ -133,11 +133,9 @@ public synchronized void finish(long timeout) throws TransportException { } catch (Throwable e) { this.stateEstablished(); if (e instanceof TimeoutException) { - throw new TransportException( - "Timeout(%sms) to wait finish-response", timeout); + throw new TransportException("Timeout(%sms) to wait finish-response", timeout); } else { - throw new TransportException("Failed to wait finish-response", - e); + throw new TransportException("Failed to wait finish-response", e); } } finally { this.finishedFutureRef.compareAndSet(finishFuture, null); @@ -150,13 +148,11 @@ public synchronized CompletableFuture finishAsync() { "at finishAsync()", this.state); CompletableFuture finishedFuture = new CompletableFuture<>(); - boolean success = this.finishedFutureRef.compareAndSet(null, - finishedFuture); + boolean success = this.finishedFutureRef.compareAndSet(null, finishedFuture); E.checkArgument(success, "The finishedFutureRef value must be null " + "at finishAsync()"); int finishId = this.genFinishId(); - this.stateFinishSent(finishId); try { FinishMessage finishMessage = new FinishMessage(finishId); diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java index fc5b5dc56..8d776b327 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java @@ -61,22 +61,20 @@ public class WorkerService implements Closeable { private static final Logger LOG = Log.logger(WorkerService.class); + private volatile boolean inited; + private volatile boolean closed; + private final ComputerContext context; - private final Managers managers; private final Map workers; + private final Managers managers; + private final ShutdownHook shutdownHook; - private volatile boolean inited; - private volatile boolean closed; - private Config config; private Bsp4Worker bsp4Worker; + private Config config; private ComputeManager computeManager; private ContainerInfo workerInfo; - private Combiner combiner; - private ContainerInfo masterInfo; - - private volatile ShutdownHook shutdownHook; private volatile Thread serviceThread; public WorkerService() { @@ -91,57 +89,67 @@ public WorkerService() { /** * Init worker service, create the managers used by worker service. */ - public void init(Config config) { - E.checkArgument(!this.inited, "The %s has been initialized", this); - - this.serviceThread = Thread.currentThread(); - this.registerShutdownHook(); - - this.config = config; - - this.workerInfo = new ContainerInfo(); - LOG.info("{} Start to initialize worker", this); - - this.bsp4Worker = new Bsp4Worker(this.config, this.workerInfo); + public synchronized void init(Config config) { + try { + LOG.info("{} Prepare to init WorkerService", this); + // TODO: what will happen if init() called by multiple threads? + E.checkArgument(!this.inited, "The %s has been initialized", this); - /* - * Keep the waitMasterInitDone() called before initManagers(), - * in order to ensure master init() before worker managers init() - */ - this.masterInfo = this.bsp4Worker.waitMasterInitDone(); + this.serviceThread = Thread.currentThread(); + this.registerShutdownHook(); + this.config = config; + this.workerInfo = new ContainerInfo(); - InetSocketAddress address = this.initManagers(this.masterInfo); - this.workerInfo.updateAddress(address); + LOG.info("{} Start to initialize worker", this); + this.bsp4Worker = new Bsp4Worker(this.config, this.workerInfo); + /* + * Keep the waitMasterInitDone() called before initManagers(), + * in order to ensure master init() before worker managers init() + */ + ContainerInfo masterInfo = this.bsp4Worker.waitMasterInitDone(); + InetSocketAddress address = this.initManagers(masterInfo); + this.workerInfo.updateAddress(address); + this.loadComputation(); + + LOG.info("{} register WorkerService", this); + this.bsp4Worker.workerInitDone(); + this.connectToWorkers(); + + this.computeManager = new ComputeManager(this.workerInfo.id(), this.context, + this.managers); + + this.managers.initedAll(this.config); + LOG.info("{} WorkerService initialized", this); + this.inited = true; + } catch (Exception e) { + LOG.error("Error while initializing WorkerService", e); + // TODO: shall we call close() here? + throw e; + } + } + private void loadComputation() { Computation computation = this.config.createObject( - ComputerOptions.WORKER_COMPUTATION_CLASS); + ComputerOptions.WORKER_COMPUTATION_CLASS); LOG.info("Loading computation '{}' in category '{}'", computation.name(), computation.category()); - this.combiner = this.config.createObject( - ComputerOptions.WORKER_COMBINER_CLASS, false); + this.combiner = this.config.createObject(ComputerOptions.WORKER_COMBINER_CLASS, false); if (this.combiner == null) { - LOG.info("None combiner is provided for computation '{}'", - computation.name()); + LOG.info("None combiner is provided for computation '{}'", computation.name()); } else { LOG.info("Combiner '{}' is provided for computation '{}'", this.combiner.name(), computation.name()); } + } - LOG.info("{} register WorkerService", this); - this.bsp4Worker.workerInitDone(); + private void connectToWorkers() { List workers = this.bsp4Worker.waitMasterAllInitDone(); DataClientManager dm = this.managers.get(DataClientManager.NAME); for (ContainerInfo worker : workers) { this.workers.put(worker.id(), worker); dm.connect(worker.id(), worker.hostname(), worker.dataPort()); } - - this.computeManager = new ComputeManager(this.workerInfo.id(), this.context, this.managers); - - this.managers.initedAll(this.config); - LOG.info("{} WorkerService initialized", this); - this.inited = true; } private void registerShutdownHook() { @@ -152,29 +160,50 @@ private void registerShutdownHook() { } /** - * Stop the worker service. Stop the managers created in - * {@link #init(Config)}. + * Stop the worker service. Stop the managers created in {@link #init(Config)}. */ @Override public synchronized void close() { - this.checkInited(); + // TODO: why checkInited() here, if init throws exception, how to close the resource? + //this.checkInited(); if (this.closed) { LOG.info("{} WorkerService had closed before", this); return; } - this.computeManager.close(); + try { + if (this.computeManager != null) { + this.computeManager.close(); + } else { + LOG.warn("The computeManager is null"); + return; + } + } catch (Exception e) { + LOG.error("Error when closing ComputeManager", e); + } /* * Seems managers.closeAll() would do the following actions: * TODO: close the connection to other workers. * TODO: stop the connection to the master * TODO: stop the data transportation server. */ - this.managers.closeAll(this.config); + try { + this.managers.closeAll(this.config); + } catch (Exception e) { + LOG.error("Error while closing managers", e); + } - this.bsp4Worker.workerCloseDone(); - this.bsp4Worker.close(); - this.shutdownHook.unhook(); + try { + this.bsp4Worker.workerCloseDone(); + this.bsp4Worker.close(); + } catch (Exception e) { + LOG.error("Error while closing bsp4Worker", e); + } + try { + this.shutdownHook.unhook(); + } catch (Exception e) { + LOG.error("Error while unhooking shutdownHook", e); + } this.closed = true; LOG.info("{} WorkerService closed", this); @@ -201,14 +230,13 @@ private void cleanAndCloseBsp() { } /** - * Execute the superstep in worker. It first wait master witch superstep + * Execute the superstep in worker. It first waits master witch superstep * to start from. And then do the superstep iteration until master's * superstepStat is inactive. */ public void execute() { - this.checkInited(); - LOG.info("{} WorkerService execute", this); + this.checkInited(); // TODO: determine superstep if fail over is enabled. int superstep = this.bsp4Worker.waitMasterResumeDone(); @@ -227,8 +255,7 @@ public void execute() { * superstep. */ while (superstepStat.active()) { - WorkerContext context = new SuperstepContext(superstep, - superstepStat); + WorkerContext context = new SuperstepContext(superstep, superstepStat); LOG.info("Start computation of superstep {}", superstep); if (superstep > 0) { this.computeManager.takeRecvedMessages(); @@ -242,13 +269,12 @@ public void execute() { this.managers.beforeSuperstep(this.config, superstep); /* - * Notify master by each worker, when the master received all + * Notify the master by each worker, when the master received all * workers signal, then notify all workers to do compute(). */ this.bsp4Worker.workerStepPrepareDone(superstep); this.bsp4Worker.waitMasterStepPrepareDone(superstep); - WorkerStat workerStat = this.computeManager.compute(context, - superstep); + WorkerStat workerStat = this.computeManager.compute(context, superstep); this.bsp4Worker.workerStepComputeDone(superstep); this.bsp4Worker.waitMasterStepComputeDone(superstep); @@ -274,8 +300,7 @@ public void execute() { @Override public String toString() { - Object id = this.workerInfo == null ? - "?" + this.hashCode() : this.workerInfo.id(); + Object id = this.workerInfo == null ? "?" + this.hashCode() : this.workerInfo.id(); return String.format("[worker %s]", id); } @@ -287,13 +312,11 @@ private InetSocketAddress initManagers(ContainerInfo masterInfo) { * NOTE: this init() method will be called twice, will be ignored at * the 2nd time call. */ - WorkerRpcManager.updateRpcRemoteServerConfig(this.config, - masterInfo.hostname(), + WorkerRpcManager.updateRpcRemoteServerConfig(this.config, masterInfo.hostname(), masterInfo.rpcPort()); rpcManager.init(this.config); - WorkerAggrManager aggregatorManager = new WorkerAggrManager( - this.context); + WorkerAggrManager aggregatorManager = new WorkerAggrManager(this.context); aggregatorManager.service(rpcManager.aggregateRpcService()); this.managers.add(aggregatorManager); FileManager fileManager = new FileManager(); @@ -307,30 +330,22 @@ private InetSocketAddress initManagers(ContainerInfo masterInfo) { this.managers.add(recvManager); ConnectionManager connManager = new TransportConnectionManager(); - DataServerManager serverManager = new DataServerManager(connManager, - recvManager); + DataServerManager serverManager = new DataServerManager(connManager, recvManager); this.managers.add(serverManager); - DataClientManager clientManager = new DataClientManager(connManager, - this.context); + DataClientManager clientManager = new DataClientManager(connManager, this.context); this.managers.add(clientManager); SortManager sendSortManager = new SendSortManager(this.context); this.managers.add(sendSortManager); - MessageSendManager sendManager = new MessageSendManager(this.context, - sendSortManager, - clientManager.sender()); + MessageSendManager sendManager = new MessageSendManager(this.context, sendSortManager, + clientManager.sender()); this.managers.add(sendManager); - - SnapshotManager snapshotManager = new SnapshotManager(this.context, - sendManager, - recvManager, - this.workerInfo); + SnapshotManager snapshotManager = new SnapshotManager(this.context, sendManager, + recvManager, this.workerInfo); this.managers.add(snapshotManager); - - WorkerInputManager inputManager = new WorkerInputManager(this.context, - sendManager, + WorkerInputManager inputManager = new WorkerInputManager(this.context, sendManager, snapshotManager); inputManager.service(rpcManager.inputSplitService()); this.managers.add(inputManager); @@ -339,8 +354,8 @@ private InetSocketAddress initManagers(ContainerInfo masterInfo) { this.managers.initAll(this.config); InetSocketAddress address = serverManager.address(); - LOG.info("{} WorkerService initialized managers with data server " + - "address '{}'", this, address); + LOG.info("{} WorkerService initialized managers with data server address '{}'", + this, address); return address; } @@ -350,7 +365,7 @@ private void checkInited() { /** * Load vertices and edges from HugeGraph. There are two phases in - * inputstep. First phase is get input splits from master, and read the + * inputstep. The First phase is to get input splits from master, and read the * vertices and edges from input splits. The second phase is after all * workers read input splits, the workers merge the vertices and edges to * get the stats for each partition. @@ -365,10 +380,8 @@ private SuperstepStat inputstep() { WorkerStat workerStat = this.computeManager.input(); - this.bsp4Worker.workerStepDone(Constants.INPUT_SUPERSTEP, - workerStat); - SuperstepStat superstepStat = this.bsp4Worker.waitMasterStepDone( - Constants.INPUT_SUPERSTEP); + this.bsp4Worker.workerStepDone(Constants.INPUT_SUPERSTEP, workerStat); + SuperstepStat superstepStat = this.bsp4Worker.waitMasterStepDone(Constants.INPUT_SUPERSTEP); manager.close(this.config); LOG.info("{} WorkerService inputstep finished", this); return superstepStat; @@ -395,10 +408,8 @@ private class SuperstepContext implements WorkerContext { private SuperstepContext(int superstep, SuperstepStat superstepStat) { this.superstep = superstep; this.superstepStat = superstepStat; - this.aggrManager = WorkerService.this.managers.get( - WorkerAggrManager.NAME); - this.sendManager = WorkerService.this.managers.get( - MessageSendManager.NAME); + this.aggrManager = WorkerService.this.managers.get(WorkerAggrManager.NAME); + this.sendManager = WorkerService.this.managers.get(MessageSendManager.NAME); } @Override diff --git a/computer-dist/src/assembly/dataset/struct.json b/computer-dist/src/assembly/dataset/struct.json index ed571ca4f..c1d8be1ec 100644 --- a/computer-dist/src/assembly/dataset/struct.json +++ b/computer-dist/src/assembly/dataset/struct.json @@ -6,7 +6,7 @@ "skip": false, "input": { "type": "FILE", - "path": "computer-dist/src/assembly/dataset/ml-latest-small/ratings.csv", + "path": "/dataset/ml-latest-small/ratings.csv", "file_filter": { "extensions": [ "*" @@ -83,7 +83,7 @@ "skip": false, "input": { "type": "FILE", - "path": "computer-dist/src/assembly/dataset/ml-latest-small/tags.csv", + "path": "/dataset/ml-latest-small/tags.csv", "file_filter": { "extensions": [ "*" @@ -160,7 +160,7 @@ "skip": false, "input": { "type": "FILE", - "path": "computer-dist/src/assembly/dataset/ml-latest-small/movies.csv", + "path": "/dataset/ml-latest-small/movies.csv", "file_filter": { "extensions": [ "*" diff --git a/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh b/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh index af5281de5..b5365caa9 100755 --- a/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh +++ b/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +# Note: this script is not used in github-ci now, keep it for other env set -ev if [[ $# -ne 1 ]]; then @@ -40,4 +42,5 @@ chmod -R 755 bin/ bin/init-store.sh || exit 1 bin/start-hugegraph.sh || cat logs/hugegraph-server.log + cd ../ diff --git a/computer-dist/src/assembly/travis/install-k8s.sh b/computer-dist/src/assembly/travis/install-k8s.sh index 4619fa911..d18f49283 100755 --- a/computer-dist/src/assembly/travis/install-k8s.sh +++ b/computer-dist/src/assembly/travis/install-k8s.sh @@ -17,6 +17,7 @@ # set -ev +# TODO: could replace by docker way curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 && chmod +x minikube sudo mkdir -p /usr/local/bin/ sudo install minikube /usr/local/bin/ diff --git a/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh b/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh index e00890b6a..b448b6619 100755 --- a/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh +++ b/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh @@ -21,27 +21,43 @@ set -ev TRAVIS_DIR=$(dirname "$0") DATASET_DIR=${TRAVIS_DIR}/../dataset -HUGEGRAPH_LOADER_GIT_URL="https://github.com/apache/hugegraph-toolchain.git" - -git clone --depth 10 ${HUGEGRAPH_LOADER_GIT_URL} hugegraph-toolchain - -cd hugegraph-toolchain -mvn install -P stage -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp - -cd hugegraph-loader -tar -zxf target/apache-hugegraph-loader-*.tar.gz || exit 1 -cd ../../ +docker network create ci +# Note: we need wait for server start finished, so start it first +docker run -itd --name=graph --network ci -p 8080:8080 hugegraph/hugegraph:latest && sleep 6 wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip unzip -d ${DATASET_DIR} ml-latest-small.zip -hugegraph-toolchain/hugegraph-loader/apache-hugegraph-loader-*/bin/hugegraph-loader.sh \ --g hugegraph -f ${DATASET_DIR}/struct.json -s ${DATASET_DIR}/schema.groovy || exit 1 +cd ${DATASET_DIR}/.. && pwd && ls -lh * + +docker run -id --name=loader --network ci hugegraph/loader:latest +docker cp dataset loader:/dataset || exit 1 + +docker exec -i loader ls -lh /dataset +docker exec -i loader bin/hugegraph-loader.sh -g hugegraph -p 8080 -h graph \ + -f /dataset/struct.json -s /dataset/schema.groovy || exit 1 # load dataset to hdfs -sort -t , -k1n -u "${DATASET_DIR}"/ml-latest-small/ratings.csv | cut -d "," -f 1 > "${DATASET_DIR}"/ml-latest-small/user_id.csv || exit 1 +sort -t , -k1n -u dataset/ml-latest-small/ratings.csv | cut -d "," -f 1 >dataset/ml-latest-small/user_id.csv || exit 1 /opt/hadoop/bin/hadoop fs -mkdir -p /dataset/ml-latest-small || exit 1 -/opt/hadoop/bin/hadoop fs -put "${DATASET_DIR}"/ml-latest-small/* /dataset/ml-latest-small || exit 1 +/opt/hadoop/bin/hadoop fs -put dataset/ml-latest-small/* /dataset/ml-latest-small || exit 1 /opt/hadoop/bin/hadoop fs -ls /dataset/ml-latest-small echo "Load finished, continue to next step" + +############# Note: this part is not used in github-ci now, backup it for other env ############## +#HUGEGRAPH_LOADER_GIT_URL="https://github.com/apache/hugegraph-toolchain.git" +#git clone --depth 10 ${HUGEGRAPH_LOADER_GIT_URL} hugegraph-toolchain +# +#cd hugegraph-toolchain +#mvn install -P stage -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp +# +#cd hugegraph-loader +#tar -zxf target/apache-hugegraph-loader-*.tar.gz || exit 1 +#cd ../../ + +#wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip +#unzip -d ${DATASET_DIR} ml-latest-small.zip + +#hugegraph-toolchain/hugegraph-loader/apache-hugegraph-loader-*/bin/hugegraph-loader.sh \ +# -g hugegraph -f ${DATASET_DIR}/struct.json -s ${DATASET_DIR}/schema.groovy || exit 1 diff --git a/computer-dist/src/assembly/travis/start-etcd.sh b/computer-dist/src/assembly/travis/start-etcd.sh index 896d0832e..fe3893ac7 100644 --- a/computer-dist/src/assembly/travis/start-etcd.sh +++ b/computer-dist/src/assembly/travis/start-etcd.sh @@ -17,8 +17,9 @@ # set -ev -TRAVIS_DIR=`dirname $0` +TRAVIS_DIR=$(dirname $0) echo "Starting etcd..." +# TODO: replace with docker way wget -O ${TRAVIS_DIR}/etcd.tar.gz https://github.com/etcd-io/etcd/releases/download/v3.5.0/etcd-v3.5.0-linux-amd64.tar.gz mkdir ${TRAVIS_DIR}/etcd tar -zxvf ${TRAVIS_DIR}/etcd.tar.gz -C ${TRAVIS_DIR}/etcd --strip-components 1 diff --git a/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java b/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java index 9934f5137..c440551ad 100644 --- a/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java +++ b/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -36,7 +37,6 @@ import org.apache.commons.configuration2.MapConfiguration; import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpStatus; import org.apache.hugegraph.computer.k8s.Constants; import org.apache.hugegraph.computer.k8s.operator.common.AbstractController; import org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions; @@ -61,6 +61,11 @@ import io.fabric8.kubernetes.client.informers.SharedInformerFactory; import io.fabric8.kubernetes.client.utils.Utils; +/** + * The OperatorEntrypoint class is the main entry point for the Kubernetes operator. + * It sets up the Kubernetes client, registers controllers, and starts the HTTP server for + * health checks. + */ public class OperatorEntrypoint { private static final Logger LOG = Log.logger(OperatorEntrypoint.class); @@ -74,15 +79,13 @@ public class OperatorEntrypoint { public static void main(String[] args) { OperatorEntrypoint operatorEntrypoint = new OperatorEntrypoint(); - Runtime.getRuntime().addShutdownHook( - new Thread(operatorEntrypoint::shutdown)); + Runtime.getRuntime().addShutdownHook(new Thread(operatorEntrypoint::shutdown)); operatorEntrypoint.start(); } static { - OptionSpace.register( - "computer-k8s-operator", - "org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions" + OptionSpace.register("computer-k8s-operator", + "org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions" ); } @@ -98,8 +101,7 @@ public OperatorEntrypoint() { public void start() { try { this.kubeClient = new DefaultKubernetesClient(); - String watchNamespace = this.config.get( - OperatorOptions.WATCH_NAMESPACE); + String watchNamespace = this.config.get(OperatorOptions.WATCH_NAMESPACE); if (!Objects.equals(watchNamespace, Constants.ALL_NAMESPACE)) { this.createNamespace(watchNamespace); this.kubeClient = this.kubeClient.inNamespace(watchNamespace); @@ -108,19 +110,17 @@ public void start() { LOG.info("Watch namespace: " + watchNamespace); this.addHealthCheck(); - this.registerControllers(); this.informerFactory.startAllRegisteredInformers(); this.informerFactory.addSharedInformerEventListener(exception -> { - LOG.error("Informer event listener exception occurred", - exception); + LOG.error("Informer event listener exception occurred", exception); OperatorEntrypoint.this.shutdown(); }); - // Start all controller - this.controllerPool = ExecutorUtil.newFixedThreadPool( - this.controllers.size(), "controllers-%d"); + // Start all controllers + this.controllerPool = ExecutorUtil.newFixedThreadPool(this.controllers.size(), + "controllers-%d"); CountDownLatch latch = new CountDownLatch(this.controllers.size()); List> futures = new ArrayList<>(); for (AbstractController controller : this.controllers) { @@ -141,8 +141,7 @@ public void start() { } }); - CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{})) - .get(); + CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{})).get(); } catch (Throwable throwable) { LOG.error("Failed to start Operator: ", throwable); } finally { @@ -201,16 +200,14 @@ private HugeConfig configFromSysPropsOrEnvVars() { } private void registerControllers() { - ComputerJobController jobController = new ComputerJobController( - this.config, this.kubeClient); - this.registerController(jobController, - ConfigMap.class, Job.class, Pod.class); + ComputerJobController jobController = new ComputerJobController(this.config, + this.kubeClient); + this.registerController(jobController, ConfigMap.class, Job.class, Pod.class); } @SafeVarargs - private final void registerController( - AbstractController controller, - Class... ownsClass) { + private void registerController(AbstractController controller, + Class... ownsClass) { controller.register(this.informerFactory, ownsClass); this.controllers.add(controller); } @@ -222,7 +219,7 @@ private void addHealthCheck() throws IOException { this.httpServer = HttpServer.create(address, probeBacklog); this.httpServer.createContext("/health", httpExchange -> { byte[] bytes = "ALL GOOD!".getBytes(StandardCharsets.UTF_8); - httpExchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length); + httpExchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, bytes.length); OutputStream responseBody = httpExchange.getResponseBody(); responseBody.write(bytes); responseBody.close(); @@ -233,7 +230,7 @@ private void addHealthCheck() throws IOException { private void addReadyCheck() { this.httpServer.createContext("/ready", httpExchange -> { byte[] bytes = "ALL Ready!".getBytes(StandardCharsets.UTF_8); - httpExchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length); + httpExchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, bytes.length); OutputStream responseBody = httpExchange.getResponseBody(); responseBody.write(bytes); responseBody.close(); @@ -245,8 +242,7 @@ private void createNamespace(String namespace) { .withName(namespace) .endMetadata(); KubeUtil.ignoreExists(() -> { - return this.kubeClient.namespaces() - .create(builder.build()); + return this.kubeClient.namespaces().create(builder.build()); }); } } diff --git a/computer-test/pom.xml b/computer-test/pom.xml index 4b55da32f..16f17a7fa 100644 --- a/computer-test/pom.xml +++ b/computer-test/pom.xml @@ -38,6 +38,12 @@ org.apache.hugegraph hugegraph-common + + + kotlin-stdlib-jdk8 + org.jetbrains.kotlin + + org.apache.hugegraph @@ -96,11 +102,37 @@ 3.8.0 compile + + com.squareup.okhttp3 + mockwebserver + 4.12.0 + test + + + junit + junit + + + kotlin-stdlib + org.jetbrains.kotlin + + + io.fabric8 kubernetes-server-mock 5.6.0 compile + + + mockwebserver + com.squareup.okhttp3 + + + jackson-databind + com.fasterxml.jackson.core + + diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java index acf48bab0..6d132a83c 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java @@ -53,18 +53,12 @@ public class NettyTransportClientTest extends AbstractNetworkTest { @Override protected void initOption() { - super.updateOption(ComputerOptions.TRANSPORT_MAX_PENDING_REQUESTS, - 8); - super.updateOption(ComputerOptions.TRANSPORT_MIN_PENDING_REQUESTS, - 6); - super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_HIGH_MARK, - 64 * (int) Bytes.MB); - super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_LOW_MARK, - 32 * (int) Bytes.MB); - super.updateOption(ComputerOptions.TRANSPORT_MIN_ACK_INTERVAL, - 200L); - super.updateOption(ComputerOptions.TRANSPORT_FINISH_SESSION_TIMEOUT, - 30_000L); + super.updateOption(ComputerOptions.TRANSPORT_MAX_PENDING_REQUESTS, 8); + super.updateOption(ComputerOptions.TRANSPORT_MIN_PENDING_REQUESTS, 6); + super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_HIGH_MARK, 64 * (int) Bytes.MB); + super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_LOW_MARK, 32 * (int) Bytes.MB); + super.updateOption(ComputerOptions.TRANSPORT_MIN_ACK_INTERVAL, 200L); + super.updateOption(ComputerOptions.TRANSPORT_FINISH_SESSION_TIMEOUT, 30_000L); } @Test @@ -124,12 +118,9 @@ public void testSend() throws IOException { NettyTransportClient client = (NettyTransportClient) this.oneClient(); for (int i = 0; i < 3; i++) { client.startSession(); - client.send(MessageType.MSG, 1, - ByteBuffer.wrap(StringEncoding.encode("test1"))); - client.send(MessageType.VERTEX, 2, - ByteBuffer.wrap(StringEncoding.encode("test2"))); - client.send(MessageType.EDGE, 3, - ByteBuffer.wrap(StringEncoding.encode("test3"))); + client.send(MessageType.MSG, 1, ByteBuffer.wrap(StringEncoding.encode("test1"))); + client.send(MessageType.VERTEX, 2, ByteBuffer.wrap(StringEncoding.encode("test2"))); + client.send(MessageType.EDGE, 3, ByteBuffer.wrap(StringEncoding.encode("test3"))); client.finishSession(); } } @@ -172,8 +163,7 @@ public void testDataUniformity() throws IOException { Assert.assertNotSame(sourceBytes, bytes); return null; - }).when(serverHandler).handle(Mockito.any(), Mockito.eq(1), - Mockito.any()); + }).when(serverHandler).handle(Mockito.any(), Mockito.eq(1), Mockito.any()); client.startSession(); client.send(MessageType.MSG, 1, ByteBuffer.wrap(sourceBytes1)); @@ -187,35 +177,25 @@ public void testStartSessionWithTimeout() throws IOException { NettyTransportClient client = (NettyTransportClient) this.oneClient(); Function sendFunc = message -> null; - Whitebox.setInternalState(client.clientSession(), - "sendFunction", sendFunc); + Whitebox.setInternalState(client.clientSession(), "sendFunction", sendFunc); - Assert.assertThrows(TransportException.class, () -> { - client.startSession(); - }, e -> { - Assert.assertContains("to wait start-response", - e.getMessage()); + Assert.assertThrows(TransportException.class, client::startSession, e -> { + Assert.assertContains("to wait start-response", e.getMessage()); }); } @Test public void testFinishSessionWithTimeout() throws IOException { NettyTransportClient client = (NettyTransportClient) this.oneClient(); - client.startSession(); Function sendFunc = message -> null; - Whitebox.setInternalState(client.clientSession(), - "sendFunction", sendFunc); + Whitebox.setInternalState(client.clientSession(), "sendFunction", sendFunc); - Whitebox.setInternalState(client, "timeoutFinishSession", - 1000L); + Whitebox.setInternalState(client, "timeoutFinishSession", 1000L); - Assert.assertThrows(TransportException.class, () -> { - client.finishSession(); - }, e -> { - Assert.assertContains("to wait finish-response", - e.getMessage()); + Assert.assertThrows(TransportException.class, client::finishSession, e -> { + Assert.assertContains("to wait finish-response", e.getMessage()); }); } @@ -224,18 +204,14 @@ public void testStartSessionWithSendException() throws IOException { NettyTransportClient client = (NettyTransportClient) this.oneClient(); @SuppressWarnings("unchecked") - Function sendFunc = - Mockito.mock(Function.class); - Whitebox.setInternalState(client.clientSession(), - "sendFunction", sendFunc); + Function sendFunc = Mockito.mock(Function.class); + Whitebox.setInternalState(client.clientSession(), "sendFunction", sendFunc); Mockito.doThrow(new RuntimeException("test exception")) .when(sendFunc) .apply(Mockito.any()); - Assert.assertThrows(RuntimeException.class, () -> { - client.startSession(); - }, e -> { + Assert.assertThrows(RuntimeException.class, client::startSession, e -> { Assert.assertContains("test exception", e.getMessage()); }); } @@ -243,38 +219,31 @@ public void testStartSessionWithSendException() throws IOException { @Test public void testFinishSessionWithSendException() throws IOException { NettyTransportClient client = (NettyTransportClient) this.oneClient(); - client.startSession(); @SuppressWarnings("unchecked") Function> sendFunc = Mockito.mock(Function.class); - Whitebox.setInternalState(client.clientSession(), - "sendFunction", sendFunc); + Whitebox.setInternalState(client.clientSession(), "sendFunction", sendFunc); Mockito.doThrow(new RuntimeException("test exception")) .when(sendFunc) .apply(Mockito.any()); - Assert.assertThrows(RuntimeException.class, () -> { - client.finishSession(); - }, e -> { + Assert.assertThrows(RuntimeException.class, client::finishSession, e -> { Assert.assertContains("test exception", e.getMessage()); }); } @Test public void testFlowControl() throws IOException { - ByteBuffer buffer = ByteBuffer.wrap( - StringEncoding.encode("test data")); + ByteBuffer buffer = ByteBuffer.wrap(StringEncoding.encode("test data")); NettyTransportClient client = (NettyTransportClient) this.oneClient(); client.startSession(); - Object sendFuncBak = Whitebox.getInternalState(client.clientSession(), - "sendFunction"); + Object sendFuncBak = Whitebox.getInternalState(client.clientSession(), "sendFunction"); Function sendFunc = message -> null; - Whitebox.setInternalState(client.clientSession(), - "sendFunction", sendFunc); + Whitebox.setInternalState(client.clientSession(), "sendFunction", sendFunc); for (int i = 1; i <= conf.maxPendingRequests() * 2; i++) { boolean send = client.send(MessageType.MSG, 1, buffer); @@ -285,10 +254,8 @@ public void testFlowControl() throws IOException { } } - int maxRequestId = Whitebox.getInternalState(client.clientSession(), - "maxRequestId"); - int maxAckId = Whitebox.getInternalState(client.clientSession(), - "maxAckId"); + int maxRequestId = Whitebox.getInternalState(client.clientSession(), "maxRequestId"); + int maxAckId = Whitebox.getInternalState(client.clientSession(), "maxAckId"); Assert.assertEquals(conf.maxPendingRequests(), maxRequestId); Assert.assertEquals(0, maxAckId); @@ -299,18 +266,15 @@ public void testFlowControl() throws IOException { } Assert.assertTrue(client.checkSendAvailable()); - maxAckId = Whitebox.getInternalState(client.clientSession(), - "maxAckId"); + maxAckId = Whitebox.getInternalState(client.clientSession(), "maxAckId"); Assert.assertEquals(pendings + 1, maxAckId); - Whitebox.setInternalState(client.clientSession(), "sendFunction", - sendFuncBak); + Whitebox.setInternalState(client.clientSession(), "sendFunction", sendFuncBak); } @Test public void testHandlerException() throws IOException { NettyTransportClient client = (NettyTransportClient) this.oneClient(); - client.startSession(); Mockito.doThrow(new RuntimeException("test exception")).when(serverHandler) @@ -320,14 +284,10 @@ public void testHandlerException() throws IOException { boolean send = client.send(MessageType.MSG, 1, buffer); Assert.assertTrue(send); - Whitebox.setInternalState(client, "timeoutFinishSession", - 1000L); + Whitebox.setInternalState(client, "timeoutFinishSession", 1000L); - Assert.assertThrows(TransportException.class, () -> { - client.finishSession(); - }, e -> { - Assert.assertContains("to wait finish-response", - e.getMessage()); + Assert.assertThrows(TransportException.class, client::finishSession, e -> { + Assert.assertContains("finish-response", e.getMessage()); }); Mockito.verify(serverHandler, Mockito.timeout(10_000L).times(1)) @@ -344,13 +304,11 @@ public void testCheckMinPendingRequests() { TransportConf conf = TransportConf.wrapConfig(config); - Assert.assertThrows(IllegalArgumentException.class, - conf::minPendingRequests); + Assert.assertThrows(IllegalArgumentException.class, conf::minPendingRequests); } @Test - public void testSessionActive() throws IOException, InterruptedException, - ExecutionException, + public void testSessionActive() throws IOException, InterruptedException, ExecutionException, TimeoutException { NettyTransportClient client = (NettyTransportClient) this.oneClient(); diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java index b423d3861..bba8b9e54 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java @@ -72,17 +72,10 @@ public void setup() { this.fileManager.init(this.config); this.sortManager = new RecvSortManager(context()); this.sortManager.init(this.config); - this.receiveManager = new MessageRecvManager(context(), - this.fileManager, - this.sortManager); - this.snapshotManager = new SnapshotManager(context(), - null, - receiveManager, - null); + this.receiveManager = new MessageRecvManager(context(), this.fileManager, this.sortManager); + this.snapshotManager = new SnapshotManager(context(), null, receiveManager, null); this.receiveManager.init(this.config); - this.connectionId = new ConnectionId( - new InetSocketAddress("localhost",8081), - 0); + this.connectionId = new ConnectionId(new InetSocketAddress("localhost", 8081), 0); } @After @@ -94,31 +87,28 @@ public void teardown() { @Test public void testVertexAndEdgeMessage() throws IOException { - // Send vertex message + // Send vertex messages this.receiveManager.onStarted(this.connectionId); this.receiveManager.onFinished(this.connectionId); - VertexMessageRecvPartitionTest.addTenVertexBuffer( - (NetworkBuffer buffer) -> { + VertexMessageRecvPartitionTest.addTenVertexBuffer((NetworkBuffer buffer) -> { this.receiveManager.handle(MessageType.VERTEX, 0, buffer); }); - EdgeMessageRecvPartitionTest.addTenEdgeBuffer( - (NetworkBuffer buffer) -> { + EdgeMessageRecvPartitionTest.addTenEdgeBuffer((NetworkBuffer buffer) -> { this.receiveManager.handle(MessageType.EDGE, 0, buffer); }); - // Send edge message + // Send edge messages this.receiveManager.onStarted(this.connectionId); this.receiveManager.onFinished(this.connectionId); this.receiveManager.waitReceivedAllMessages(); Map> vertexPartitions = - this.receiveManager.vertexPartitions(); + this.receiveManager.vertexPartitions(); Map> edgePartitions = - this.receiveManager.edgePartitions(); + this.receiveManager.edgePartitions(); Assert.assertEquals(1, vertexPartitions.size()); Assert.assertEquals(1, edgePartitions.size()); - VertexMessageRecvPartitionTest.checkPartitionIterator( - vertexPartitions.get(0)); + VertexMessageRecvPartitionTest.checkPartitionIterator(vertexPartitions.get(0)); EdgeMessageRecvPartitionTest.checkTenEdges(edgePartitions.get(0)); } @@ -126,9 +116,8 @@ public void testVertexAndEdgeMessage() throws IOException { public void testComputeMessage() throws IOException { // Superstep 0 this.receiveManager.beforeSuperstep(this.config, 0); - ComputeMessageRecvPartitionTest.addTwentyCombineMessageBuffer( - (NetworkBuffer buffer) -> { - this.receiveManager.handle(MessageType.MSG, 0, buffer); + ComputeMessageRecvPartitionTest.addTwentyCombineMessageBuffer((NetworkBuffer buffer) -> { + this.receiveManager.handle(MessageType.MSG, 0, buffer); }); this.receiveManager.onFinished(this.connectionId); @@ -136,17 +125,15 @@ public void testComputeMessage() throws IOException { this.receiveManager.afterSuperstep(this.config, 0); Map> messagePartitions = - this.receiveManager.messagePartitions(); + this.receiveManager.messagePartitions(); Assert.assertEquals(1, messagePartitions.size()); - ComputeMessageRecvPartitionTest.checkTenCombineMessages( - messagePartitions.get(0)); + ComputeMessageRecvPartitionTest.checkTenCombineMessages(messagePartitions.get(0)); } @Test public void testOtherMessageType() { Assert.assertThrows(ComputerException.class, () -> { - ReceiverUtil.consumeBuffer(new byte[100], - (NetworkBuffer buffer) -> { + ReceiverUtil.consumeBuffer(new byte[100], (NetworkBuffer buffer) -> { this.receiveManager.handle(MessageType.ACK, 0, buffer); }); }, e -> { @@ -161,8 +148,7 @@ public void testNotEnoughFinishMessages() { Assert.assertThrows(ComputerException.class, () -> { this.receiveManager.waitReceivedAllMessages(); }, e -> { - Assert.assertContains("Expect 1 finish-messages", - e.getMessage()); + Assert.assertContains("finish-messages", e.getMessage()); }); } } diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java index b056c7d51..3e7d1c0ba 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java @@ -47,56 +47,48 @@ public void testServiceWith1Worker() throws InterruptedException { pool.submit(() -> { Config config = UnitTestBase.updateWithRequiredOptions( - ComputerOptions.JOB_ID, "local_002", - ComputerOptions.JOB_WORKERS_COUNT, "1", - ComputerOptions.TRANSPORT_SERVER_PORT, "8086", - ComputerOptions.BSP_REGISTER_TIMEOUT, "100000", - ComputerOptions.BSP_LOG_INTERVAL, "30000", - ComputerOptions.BSP_MAX_SUPER_STEP, "2", - ComputerOptions.WORKER_COMPUTATION_CLASS, - MockComputation.class.getName(), - ComputerOptions.ALGORITHM_RESULT_CLASS, - DoubleValue.class.getName(), - ComputerOptions.ALGORITHM_MESSAGE_CLASS, - DoubleValue.class.getName(), - ComputerOptions.OUTPUT_CLASS, - LimitedLogOutput.class.getName() + ComputerOptions.JOB_ID, "local_002", + ComputerOptions.JOB_WORKERS_COUNT, "1", + ComputerOptions.TRANSPORT_SERVER_PORT, "8086", + ComputerOptions.BSP_REGISTER_TIMEOUT, "100000", + ComputerOptions.BSP_LOG_INTERVAL, "30000", + ComputerOptions.BSP_MAX_SUPER_STEP, "2", + ComputerOptions.WORKER_COMPUTATION_CLASS, + MockComputation.class.getName(), + ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName(), + ComputerOptions.OUTPUT_CLASS, + LimitedLogOutput.class.getName() ); - WorkerService workerService = new MockWorkerService(); - try { + try (WorkerService workerService = new MockWorkerService()) { workerService.init(config); workerService.execute(); } catch (Throwable e) { LOG.error("Failed to start worker", e); exceptions[0] = e; } finally { - workerService.close(); - try { - workerService.close(); - } catch (Throwable e) { - Assert.fail(e.getMessage()); - } countDownLatch.countDown(); } }); pool.submit(() -> { Config config = UnitTestBase.updateWithRequiredOptions( - RpcOptions.RPC_SERVER_HOST, "localhost", - ComputerOptions.JOB_ID, "local_002", - ComputerOptions.JOB_WORKERS_COUNT, "1", - ComputerOptions.BSP_REGISTER_TIMEOUT, "100000", - ComputerOptions.BSP_LOG_INTERVAL, "30000", - ComputerOptions.BSP_MAX_SUPER_STEP, "2", - ComputerOptions.MASTER_COMPUTATION_CLASS, - MockMasterComputation.class.getName(), - ComputerOptions.ALGORITHM_RESULT_CLASS, - DoubleValue.class.getName(), - ComputerOptions.ALGORITHM_MESSAGE_CLASS, - DoubleValue.class.getName() + RpcOptions.RPC_SERVER_HOST, "localhost", + ComputerOptions.JOB_ID, "local_002", + ComputerOptions.JOB_WORKERS_COUNT, "1", + ComputerOptions.BSP_REGISTER_TIMEOUT, "100000", + ComputerOptions.BSP_LOG_INTERVAL, "30000", + ComputerOptions.BSP_MAX_SUPER_STEP, "2", + ComputerOptions.MASTER_COMPUTATION_CLASS, + MockMasterComputation.class.getName(), + ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName() ); - MasterService masterService = new MasterService(); - try { + try (MasterService masterService = new MasterService()) { masterService.init(config); masterService.execute(); } catch (Throwable e) { @@ -108,12 +100,6 @@ public void testServiceWith1Worker() throws InterruptedException { * if count down is executed first, and the server thread in * master service will not be closed. */ - masterService.close(); - try { - masterService.close(); - } catch (Throwable e) { - Assert.fail(e.getMessage()); - } countDownLatch.countDown(); } }); @@ -121,8 +107,7 @@ public void testServiceWith1Worker() throws InterruptedException { countDownLatch.await(); pool.shutdownNow(); - Assert.assertFalse(Arrays.asList(exceptions).toString(), - existError(exceptions)); + Assert.assertFalse(Arrays.asList(exceptions).toString(), existError(exceptions)); } @Test @@ -133,89 +118,84 @@ public void testServiceWith2Workers() throws InterruptedException { pool.submit(() -> { Config config = UnitTestBase.updateWithRequiredOptions( - ComputerOptions.JOB_ID, "local_003", - ComputerOptions.JOB_WORKERS_COUNT, "2", - ComputerOptions.JOB_PARTITIONS_COUNT, "2", - ComputerOptions.TRANSPORT_SERVER_PORT, "8086", - ComputerOptions.WORKER_DATA_DIRS, "[job_8086]", - ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", - ComputerOptions.BSP_LOG_INTERVAL, "10000", - ComputerOptions.BSP_MAX_SUPER_STEP, "2", - ComputerOptions.WORKER_COMPUTATION_CLASS, - MockComputation2.class.getName(), - ComputerOptions.ALGORITHM_RESULT_CLASS, - DoubleValue.class.getName(), - ComputerOptions.ALGORITHM_MESSAGE_CLASS, - DoubleValue.class.getName() + ComputerOptions.JOB_ID, "local_003", + ComputerOptions.JOB_WORKERS_COUNT, "2", + ComputerOptions.JOB_PARTITIONS_COUNT, "2", + ComputerOptions.TRANSPORT_SERVER_PORT, "8086", + ComputerOptions.WORKER_DATA_DIRS, "[job_8086]", + ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", + ComputerOptions.BSP_LOG_INTERVAL, "10000", + ComputerOptions.BSP_MAX_SUPER_STEP, "2", + ComputerOptions.WORKER_COMPUTATION_CLASS, + MockComputation2.class.getName(), + ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName() ); - WorkerService workerService = new MockWorkerService(); - try { + + try (WorkerService workerService = new MockWorkerService()) { workerService.init(config); workerService.execute(); } catch (Throwable e) { LOG.error("Failed to start worker", e); exceptions[0] = e; } finally { - workerService.close(); countDownLatch.countDown(); } }); pool.submit(() -> { Config config = UnitTestBase.updateWithRequiredOptions( - ComputerOptions.JOB_ID, "local_003", - ComputerOptions.JOB_WORKERS_COUNT, "2", - ComputerOptions.JOB_PARTITIONS_COUNT, "2", - ComputerOptions.TRANSPORT_SERVER_PORT, "8087", - ComputerOptions.WORKER_DATA_DIRS, "[job_8087]", - ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", - ComputerOptions.BSP_LOG_INTERVAL, "10000", - ComputerOptions.BSP_MAX_SUPER_STEP, "2", - ComputerOptions.WORKER_COMPUTATION_CLASS, - MockComputation2.class.getName(), - ComputerOptions.ALGORITHM_RESULT_CLASS, - DoubleValue.class.getName(), - ComputerOptions.ALGORITHM_MESSAGE_CLASS, - DoubleValue.class.getName() + ComputerOptions.JOB_ID, "local_003", + ComputerOptions.JOB_WORKERS_COUNT, "2", + ComputerOptions.JOB_PARTITIONS_COUNT, "2", + ComputerOptions.TRANSPORT_SERVER_PORT, "8087", + ComputerOptions.WORKER_DATA_DIRS, "[job_8087]", + ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", + ComputerOptions.BSP_LOG_INTERVAL, "10000", + ComputerOptions.BSP_MAX_SUPER_STEP, "2", + ComputerOptions.WORKER_COMPUTATION_CLASS, + MockComputation2.class.getName(), + ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName() ); - WorkerService workerService = new MockWorkerService(); - try { + try (WorkerService workerService = new MockWorkerService()) { workerService.init(config); workerService.execute(); } catch (Throwable e) { LOG.error("Failed to start worker", e); exceptions[1] = e; } finally { - workerService.close(); countDownLatch.countDown(); } }); pool.submit(() -> { Config config = UnitTestBase.updateWithRequiredOptions( - RpcOptions.RPC_SERVER_HOST, "localhost", - ComputerOptions.JOB_ID, "local_003", - ComputerOptions.JOB_WORKERS_COUNT, "2", - ComputerOptions.JOB_PARTITIONS_COUNT, "2", - ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", - ComputerOptions.BSP_LOG_INTERVAL, "10000", - ComputerOptions.BSP_MAX_SUPER_STEP, "2", - ComputerOptions.MASTER_COMPUTATION_CLASS, - MockMasterComputation2.class.getName(), - ComputerOptions.ALGORITHM_RESULT_CLASS, - DoubleValue.class.getName(), - ComputerOptions.ALGORITHM_MESSAGE_CLASS, - DoubleValue.class.getName() + RpcOptions.RPC_SERVER_HOST, "localhost", + ComputerOptions.JOB_ID, "local_003", + ComputerOptions.JOB_WORKERS_COUNT, "2", + ComputerOptions.JOB_PARTITIONS_COUNT, "2", + ComputerOptions.BSP_REGISTER_TIMEOUT, "30000", + ComputerOptions.BSP_LOG_INTERVAL, "10000", + ComputerOptions.BSP_MAX_SUPER_STEP, "2", + ComputerOptions.MASTER_COMPUTATION_CLASS, + MockMasterComputation2.class.getName(), + ComputerOptions.ALGORITHM_RESULT_CLASS, + DoubleValue.class.getName(), + ComputerOptions.ALGORITHM_MESSAGE_CLASS, + DoubleValue.class.getName() ); - MasterService masterService = new MasterService(); - try { + try (MasterService masterService = new MasterService()) { masterService.init(config); masterService.execute(); } catch (Throwable e) { LOG.error("Failed to start master", e); exceptions[2] = e; } finally { - masterService.close(); countDownLatch.countDown(); } }); @@ -223,41 +203,37 @@ public void testServiceWith2Workers() throws InterruptedException { countDownLatch.await(); pool.shutdownNow(); - Assert.assertFalse(Arrays.asList(exceptions).toString(), - existError(exceptions)); + Assert.assertFalse(Arrays.asList(exceptions).toString(), existError(exceptions)); } @Test public void testFailToConnectEtcd() { Config config = UnitTestBase.updateWithRequiredOptions( - // Unavailable etcd endpoints - ComputerOptions.BSP_ETCD_ENDPOINTS, "http://abc:8098", - ComputerOptions.JOB_ID, "local_004", - ComputerOptions.JOB_WORKERS_COUNT, "1", - ComputerOptions.BSP_LOG_INTERVAL, "30000", - ComputerOptions.BSP_MAX_SUPER_STEP, "2", - ComputerOptions.WORKER_COMPUTATION_CLASS, - MockComputation.class.getName() + // Unavailable etcd endpoints + ComputerOptions.BSP_ETCD_ENDPOINTS, "http://invalid-ip:8098", + ComputerOptions.JOB_ID, "local_004", + ComputerOptions.JOB_WORKERS_COUNT, "1", + ComputerOptions.BSP_LOG_INTERVAL, "30000", + ComputerOptions.BSP_MAX_SUPER_STEP, "2", + ComputerOptions.WORKER_COMPUTATION_CLASS, + MockComputation.class.getName() ); - WorkerService workerService = new MockWorkerService(); - Assert.assertThrows(ComputerException.class, () -> { - workerService.init(config); - try { + + try (WorkerService workerService = new MockWorkerService()) { + Assert.assertThrows(ComputerException.class, () -> { + workerService.init(config); workerService.execute(); - } finally { - workerService.close(); - } - }, e -> { - Assert.assertContains("Error while getting with " + - "key='BSP_MASTER_INIT_DONE'", - e.getMessage()); - Assert.assertContains("UNAVAILABLE: unresolved address", - e.getCause().getMessage()); - }); + }, e -> { + Assert.assertContains("Error while getting with key='BSP_MASTER_INIT_DONE'", + e.getMessage()); + Assert.assertContains("UNAVAILABLE: unresolved address", + e.getCause().getMessage()); + }); + } } @Test - public void testDataTransportManagerFail() throws InterruptedException { + public void testDataTransportManagerFail() { /* * TODO: Complete this test case after data transport manager is * completed. diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java index 7ac3e6cb1..b95b602ee 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java @@ -79,8 +79,7 @@ public void setup() throws IOException { File tempFile = File.createTempFile(UUID.randomUUID().toString(), ""); try { String absolutePath = tempFile.getAbsolutePath(); - this.updateOptions(KubeDriverOptions.KUBE_CONFIG.name(), - absolutePath); + this.updateOptions(KubeDriverOptions.KUBE_CONFIG.name(), absolutePath); NamedCluster cluster = new NamedClusterBuilder() .withName("kubernetes") .withNewCluster() @@ -96,10 +95,11 @@ public void setup() throws IOException { .endContext() .build(); io.fabric8.kubernetes.api.model.Config config = Config.builder() - .withClusters(cluster) - .addToContexts(context) - .withCurrentContext(context.getName()) - .build(); + .withClusters(cluster) + .addToContexts(context) + .withCurrentContext( + context.getName()) + .build(); KubeConfigUtils.persistKubeConfigIntoFile(config, absolutePath); System.setProperty(Config.KUBERNETES_KUBECONFIG_FILE, absolutePath); @@ -126,24 +126,18 @@ public void testConstruct() { String namespace = Whitebox.getInternalState(this.driver, "namespace"); HugeConfig conf = Whitebox.getInternalState(this.driver, "conf"); Object operation = Whitebox.getInternalState(this.driver, "operation"); - MutableBoolean watchActive = Whitebox.getInternalState( - this.driver, "watchActive"); + MutableBoolean watchActive = Whitebox.getInternalState(this.driver, "watchActive"); Assert.assertTrue(watchActive.booleanValue()); Assert.assertEquals(namespace, "test"); Assert.assertNotNull(conf); Assert.assertNotNull(operation); final int workerInstances = 2; - this.updateOptions(KubeSpecOptions.WORKER_INSTANCES.name(), - workerInstances); - Map defaultSpec = Whitebox.invoke( - KubernetesDriver.class, - "defaultSpec", - this.driver); - String workerInstancesKey = KubeUtil.covertSpecKey( - KubeSpecOptions.WORKER_INSTANCES.name()); - Assert.assertEquals(defaultSpec.get(workerInstancesKey), - workerInstances); + this.updateOptions(KubeSpecOptions.WORKER_INSTANCES.name(), workerInstances); + Map defaultSpec = Whitebox.invoke(KubernetesDriver.class, + "defaultSpec", this.driver); + String workerInstancesKey = KubeUtil.covertSpecKey(KubeSpecOptions.WORKER_INSTANCES.name()); + Assert.assertEquals(defaultSpec.get(workerInstancesKey), workerInstances); } @Test @@ -167,21 +161,23 @@ public void testUploadAlgorithmJar() throws FileNotFoundException { } @Test - public void testUploadAlgorithmJarWithError() throws FileNotFoundException { + public void testUploadAlgorithmJarWithError() { Whitebox.setInternalState(this.driver, "bashPath", "conf/images/upload_test-x.sh"); String url = "https://github.com/apache/hugegraph-doc/raw/" + "binary-1.0/dist/computer/test.jar"; String path = "conf/images/test.jar"; downloadFileByUrl(url, path); - InputStream inputStream = new FileInputStream(path); - Assert.assertThrows(ComputerDriverException.class, () -> { - this.driver.uploadAlgorithmJar("PageRank", inputStream); - }, e -> { - ComputerDriverException exception = (ComputerDriverException) e; - Assert.assertContains("No such file", - exception.rootCause().getMessage()); - }); + try (InputStream inputStream = new FileInputStream(path)) { + Assert.assertThrows(ComputerDriverException.class, () -> { + this.driver.uploadAlgorithmJar("PageRank", inputStream); + }, e -> { + ComputerDriverException exception = (ComputerDriverException) e; + Assert.assertContains("No such file", exception.rootCause().getMessage()); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Test @@ -189,12 +185,9 @@ public void testSubmitJob() { Map params = new HashMap<>(); params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "10"); String jobId = this.driver.submitJob("PageRank", params); - HugeGraphComputerJob computerJob = this.operation - .withName(KubeUtil.crName(jobId)) - .get(); + HugeGraphComputerJob computerJob = this.operation.withName(KubeUtil.crName(jobId)).get(); Assert.assertNotNull(computerJob); - Assert.assertEquals(computerJob.getSpec().getAlgorithmName(), - "PageRank"); + Assert.assertEquals(computerJob.getSpec().getAlgorithmName(), "PageRank"); Assert.assertEquals(computerJob.getSpec().getJobId(), jobId); } @@ -205,16 +198,13 @@ public void testCancelJob() { String jobId = this.driver.submitJob("PageRank2", params); String crName = KubeUtil.crName(jobId); - HugeGraphComputerJob computerJob = this.operation.withName(crName) - .get(); + HugeGraphComputerJob computerJob = this.operation.withName(crName).get(); Assert.assertNotNull(computerJob); UnitTestBase.sleep(1000L); this.driver.cancelJob(jobId, params); - HugeGraphComputerJob canceledComputerJob = this.operation - .withName(crName) - .get(); + HugeGraphComputerJob canceledComputerJob = this.operation.withName(crName).get(); Assert.assertNull(canceledComputerJob); Assert.assertNull(this.driver.jobState(jobId, params)); } @@ -227,26 +217,21 @@ public void testWatchJobAndCancel() { JobObserver jobObserver = Mockito.mock(JobObserver.class); - CompletableFuture future = this.driver.waitJobAsync(jobId, - params, - jobObserver); + CompletableFuture future = this.driver.waitJobAsync(jobId, params, jobObserver); Mockito.verify(jobObserver, Mockito.timeout(5000L).atLeast(1)) .onJobStateChanged(Mockito.any(DefaultJobState.class)); future.getNow(null); - MutableBoolean watchActive = Whitebox.getInternalState(this.driver, - "watchActive"); + MutableBoolean watchActive = Whitebox.getInternalState(this.driver, "watchActive"); watchActive.setFalse(); this.driver.waitJobAsync(jobId, params, jobObserver); this.driver.cancelJob(jobId, params); UnitTestBase.sleep(1000L); - CompletableFuture future2 = this.driver.waitJobAsync(jobId, - params, - jobObserver); + CompletableFuture future2 = this.driver.waitJobAsync(jobId, params, jobObserver); Assert.assertNull(future2); } @@ -264,15 +249,12 @@ public void testJobState() { @Test public void testOnClose() { Map, JobObserver>> waits = - Whitebox.getInternalState(this.driver, "waits"); - waits.put("test-123", Pair.of(new CompletableFuture<>(), - Mockito.mock(JobObserver.class))); + Whitebox.getInternalState(this.driver, "waits"); + waits.put("test-123", Pair.of(new CompletableFuture<>(), Mockito.mock(JobObserver.class))); - AbstractWatchManager watch = - Whitebox.getInternalState( - this.driver, "watch"); - Watcher watcher = Whitebox.getInternalState( - watch, "watcher"); + AbstractWatchManager watch = Whitebox.getInternalState(this.driver, + "watch"); + Watcher watcher = Whitebox.getInternalState(watch, "watcher"); watcher.eventReceived(Watcher.Action.ADDED, null); watcher.eventReceived(Watcher.Action.ERROR, new HugeGraphComputerJob()); @@ -283,8 +265,7 @@ public void testOnClose() { WatcherException testClose = new WatcherException("test close"); watcher.onClose(testClose); - MutableBoolean watchActive = Whitebox.getInternalState(this.driver, - "watchActive"); + MutableBoolean watchActive = Whitebox.getInternalState(this.driver, "watchActive"); Assert.assertFalse(watchActive.booleanValue()); } @@ -297,14 +278,12 @@ public void testCheckComputerConf() { Assert.assertThrows(IllegalArgumentException.class, () -> { this.driver.submitJob("PageRank3", params); }, e -> { - Assert.assertContains( - "The partitions count must be >= workers instances", - e.getMessage() + Assert.assertContains("The partitions count must be >= workers instances", + e.getMessage() ); }); - Map defaultConf = Whitebox.getInternalState( - this.driver, "defaultConf"); + Map defaultConf = Whitebox.getInternalState(this.driver, "defaultConf"); defaultConf = new HashMap<>(defaultConf); defaultConf.remove(ComputerOptions.ALGORITHM_PARAMS_CLASS.name()); Whitebox.setInternalState(this.driver, "defaultConf", defaultConf); @@ -312,9 +291,8 @@ public void testCheckComputerConf() { Assert.assertThrows(IllegalArgumentException.class, () -> { this.driver.submitJob("PageRank3", params); }, e -> { - Assert.assertContains( - "The [algorithm.params_class] options can't be null", - e.getMessage() + Assert.assertContains("The [algorithm.params_class] options can't be null", + e.getMessage() ); }); } diff --git a/pom.xml b/pom.xml index 2784670be..a0d0634db 100644 --- a/pom.xml +++ b/pom.xml @@ -31,7 +31,8 @@ hugegraph-computer https://github.com/apache/hugegraph-computer - hugegraph-computer is a fast-speed, highly-scalable, fault-tolerance graph processing system developed by apache. + hugegraph-computer is a fast-speed, highly scalable, fault-tolerance graph processing + system developed by apache. 2020 @@ -88,7 +89,12 @@ - 1.0.0 + + 1.2.0 + 1.2.0 + 1.2.0 + 1.2.0 + 1.2.0 UTF-8 ${project.basedir}/.. hugegraph-computer @@ -100,10 +106,6 @@ 3.1.2 4.1.42.Final 3.12.0 - 1.0.0 - 1.0.0 - 1.0.0 - 1.0.0 8.5.6 @@ -347,7 +349,7 @@ maven-compiler-plugin - + 3.1 ${compiler.source} @@ -526,5 +528,15 @@ + + + stage + + + staged-releases + https://repository.apache.org/content/groups/staging/ + + +