diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java index f163be11aa203f..9a78f9cbcf0bff 100644 --- a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java @@ -54,6 +54,10 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -140,6 +144,27 @@ public HadoopHudiJniScanner(int fetchSize, Map params) { @Override public void open() throws IOException { try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) { + // RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck, + // so use another process to kill this stuck process. + // TODO(gaoxin): better way to solve the stuck process? + AtomicBoolean isKilled = new AtomicBoolean(false); + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + executorService.scheduleAtFixedRate(() -> { + if (!isKilled.get()) { + synchronized (HadoopHudiJniScanner.class) { + List pids = Utils.getChildProcessIds( + Utils.getCurrentProcId()); + for (long pid : pids) { + String cmd = Utils.getCommandLine(pid); + if (cmd != null && cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) { + Utils.killProcess(pid); + isKilled.set(true); + LOG.info("Kill hotspot debugger process " + pid); + } + } + } + } + }, 100, 1000, TimeUnit.MILLISECONDS); preExecutionAuthenticator.execute(() -> { initRequiredColumnsAndTypes(); initTableInfo(requiredTypes, requiredFields, fetchSize); @@ -147,7 +172,8 @@ public void open() throws IOException { initReader(properties); return null; }); - + isKilled.set(true); + executorService.shutdownNow(); } catch (Exception e) { close(); LOG.warn("failed to open hadoop hudi jni scanner", e); diff --git a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java new file mode 100644 index 00000000000000..566d2d574039bf --- /dev/null +++ b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/Utils.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hudi; + +import org.apache.commons.io.FileUtils; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; +import java.util.LinkedList; +import java.util.List; + +public class Utils { + public static long getCurrentProcId() { + try { + return ManagementFactory.getRuntimeMXBean().getPid(); + } catch (Exception e) { + throw new RuntimeException("Couldn't find PID of current JVM process.", e); + } + } + + public static List getChildProcessIds(long pid) { + try { + Process pgrep = (new ProcessBuilder("pgrep", "-P", String.valueOf(pid))).start(); + BufferedReader reader = new BufferedReader(new InputStreamReader(pgrep.getInputStream())); + List result = new LinkedList<>(); + String line; + while ((line = reader.readLine()) != null) { + result.add(Long.valueOf(line.trim())); + } + pgrep.waitFor(); + return result; + } catch (Exception e) { + throw new RuntimeException("Couldn't get child processes of PID " + pid, e); + } + } + + public static String getCommandLine(long pid) { + try { + return FileUtils.readFileToString(new File(String.format("/proc/%d/cmdline", pid))).trim(); + } catch (IOException e) { + return null; + } + } + + public static void killProcess(long pid) { + try { + Process kill = (new ProcessBuilder("kill", "-9", String.valueOf(pid))).start(); + kill.waitFor(); + } catch (Exception e) { + throw new RuntimeException("Couldn't kill process PID " + pid, e); + } + } +}