Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@ public class FrontendsProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Name").add("IP").add("HostName").add("EditLogPort").add("HttpPort").add("QueryPort").add("RpcPort")
.add("Role").add("IsMaster").add("ClusterId").add("Join").add("Alive")
.add("ReplayedJournalId").add("LastHeartbeat").add("IsHelper").add("ErrMsg")
.add("ReplayedJournalId").add("LastHeartbeat").add("IsHelper").add("ErrMsg").add("Version")
.build();

public static final int HOSTNAME_INDEX = 2;

private Catalog catalog;

public FrontendsProcNode(Catalog catalog) {
this.catalog = catalog;
}

@Override
public ProcResult fetchResult() {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);

List<List<String>> infos = Lists.newArrayList();

getFrontendsInfo(catalog, infos);
Expand All @@ -82,13 +82,13 @@ public static void getFrontendsInfo(Catalog catalog, List<List<String>> infos) {
// this may happen when majority of FOLLOWERS are down and no MASTER right now.
LOG.warn("failed to get leader: {}", e.getMessage());
}

// get all node which are joined in bdb group
List<InetSocketAddress> allFe = catalog.getHaProtocol().getElectableNodes(true /* include leader */);
allFe.addAll(catalog.getHaProtocol().getObserverNodes());
List<Pair<String, Integer>> allFeHosts = convertToHostPortPair(allFe);
List<Pair<String, Integer>> helperNodes = catalog.getHelperNodes();

for (Frontend fe : catalog.getFrontends(null /* all */)) {

List<String> info = new ArrayList<String>();
Expand All @@ -112,7 +112,7 @@ public static void getFrontendsInfo(Catalog catalog, List<List<String>> infos) {

info.add(Integer.toString(catalog.getClusterId()));
info.add(String.valueOf(isJoin(allFeHosts, fe)));

if (fe.getHost().equals(catalog.getSelfNode().first)) {
info.add("true");
info.add(Long.toString(catalog.getEditLog().getMaxJournalId()));
Expand All @@ -121,15 +121,17 @@ public static void getFrontendsInfo(Catalog catalog, List<List<String>> infos) {
info.add(Long.toString(fe.getReplayedJournalId()));
}
info.add(TimeUtils.longToTimeString(fe.getLastUpdateTime()));

info.add(String.valueOf(isHelperNode(helperNodes, fe)));

info.add(fe.getHeartbeatErrMsg());

info.add(fe.getVersion());

infos.add(info);
}
}

private static boolean isHelperNode(List<Pair<String, Integer>> helperNodes, Frontend fe) {
return helperNodes.stream().anyMatch(p -> p.first.equals(fe.getHost()) && p.second == fe.getEditLogPort());
}
Expand All @@ -142,7 +144,7 @@ private static boolean isJoin(List<Pair<String, Integer>> allFeHosts, Frontend f
}
return false;
}

private static List<Pair<String, Integer>> convertToHostPortPair(List<InetSocketAddress> addrs) {
List<Pair<String, Integer>> hostPortPair = Lists.newArrayList();
for (InetSocketAddress addr : addrs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Version;
import org.apache.doris.http.ActionController;
import org.apache.doris.http.BaseRequest;
import org.apache.doris.http.BaseResponse;
Expand All @@ -43,6 +44,7 @@ public class BootstrapFinishAction extends RestBaseAction {
public static final String REPLAYED_JOURNAL_ID = "replayedJournalId";
public static final String QUERY_PORT = "queryPort";
public static final String RPC_PORT = "rpcPort";
public static final String VERSION = "version";

public BootstrapFinishAction(ActionController controller) {
super(controller);
Expand Down Expand Up @@ -92,6 +94,7 @@ public void execute(BaseRequest request, BaseResponse response) throws DdlExcept
result.setMaxReplayedJournal(replayedJournalId);
result.setQueryPort(Config.query_port);
result.setRpcPort(Config.rpc_port);
result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
}
}
} else {
Expand All @@ -108,6 +111,7 @@ public static class BootstrapResult extends RestBaseResult {
private long replayedJournalId = 0;
private int queryPort = 0;
private int rpcPort = 0;
private String version = "";

public BootstrapResult() {
super();
Expand Down Expand Up @@ -141,6 +145,14 @@ public int getRpcPort() {
return rpcPort;
}

public String getVersion() {
return version;
}

public void setVersion(String version) {
this.version = version;
}

@Override
public String toJson() {
Gson gson = new Gson();
Expand Down
16 changes: 11 additions & 5 deletions fe/src/main/java/org/apache/doris/system/Frontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class Frontend implements Writable {
private String nodeName;
private String host;
private int editLogPort;

private String version;

private int queryPort;
private int rpcPort;

Expand All @@ -45,7 +46,7 @@ public class Frontend implements Writable {

public Frontend() {
}

public Frontend(FrontendNodeType role, String nodeName, String host, int editLogPort) {
this.role = role;
this.nodeName = nodeName;
Expand All @@ -56,11 +57,15 @@ public Frontend(FrontendNodeType role, String nodeName, String host, int editLog
public FrontendNodeType getRole() {
return this.role;
}

public String getHost() {
return this.host;
}


public String getVersion() {
return version;
}

public String getNodeName() {
return nodeName;
}
Expand All @@ -76,7 +81,7 @@ public int getRpcPort() {
public boolean isAlive() {
return isAlive;
}

public int getEditLogPort() {
return this.editLogPort;
}
Expand All @@ -103,6 +108,7 @@ public boolean handleHbResponse(FrontendHbResponse hbResponse) {
boolean isChanged = false;
if (hbResponse.getStatus() == HbStatus.OK) {
isAlive = true;
version = hbResponse.getVersion();
queryPort = hbResponse.getQueryPort();
rpcPort = hbResponse.getRpcPort();
replayedJournalId = hbResponse.getReplayedJournalId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,21 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
private int queryPort;
private int rpcPort;
private long replayedJournalId;
private String version;

public FrontendHbResponse() {
super(HeartbeatResponse.Type.FRONTEND);
}

public FrontendHbResponse(String name, int queryPort, int rpcPort, long replayedJournalId, long hbTime) {
public FrontendHbResponse(String name, int queryPort, int rpcPort, long replayedJournalId, long hbTime, String version) {
super(HeartbeatResponse.Type.FRONTEND);
this.status = HbStatus.OK;
this.name = name;
this.queryPort = queryPort;
this.rpcPort = rpcPort;
this.replayedJournalId = replayedJournalId;
this.hbTime = hbTime;
this.version = version;
}

public FrontendHbResponse(String name, String errMsg) {
Expand All @@ -72,6 +74,10 @@ public long getReplayedJournalId() {
return replayedJournalId;
}

public String getVersion() {
return version;
}

public static FrontendHbResponse read(DataInput in) throws IOException {
FrontendHbResponse result = new FrontendHbResponse();
result.readFields(in);
Expand Down Expand Up @@ -100,6 +106,7 @@ public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(super.toString());
sb.append(", name: ").append(name);
sb.append(", version: ").append(version);
sb.append(", queryPort: ").append(queryPort);
sb.append(", rpcPort: ").append(rpcPort);
sb.append(", replayedJournalId: ").append(replayedJournalId);
Expand Down
13 changes: 8 additions & 5 deletions fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Version;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.Util;
import org.apache.doris.http.rest.BootstrapFinishAction;
Expand Down Expand Up @@ -95,7 +96,7 @@ public void setMaster(int clusterId, String token, long epoch) {
@Override
protected void runAfterCatalogReady() {
List<Future<HeartbeatResponse>> hbResponses = Lists.newArrayList();

// send backend heartbeat
for (Backend backend : nodeMgr.getIdToBackend().values()) {
BackendHeartbeatHandler handler = new BackendHeartbeatHandler(backend);
Expand Down Expand Up @@ -152,7 +153,7 @@ protected void runAfterCatalogReady() {
// we also add a 'mocked' master Frontends heartbeat response to synchronize master info to other Frontends.
hbPackage.addHbResponse(new FrontendHbResponse(masterFeNodeName,
Config.query_port, Config.rpc_port, Catalog.getCurrentCatalog().getEditLog().getMaxJournalId(),
System.currentTimeMillis()));
System.currentTimeMillis(), Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH));

// write edit log
Catalog.getCurrentCatalog().getEditLog().logHeartbeat(hbPackage);
Expand Down Expand Up @@ -186,7 +187,7 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(
hbResponse.getName(), hbResponse.getHost(), hbResponse.getPort());
if (broker != null) {
boolean isChanged = broker.handleHbResponse(hbResponse);
boolean isChanged = broker.handleHbResponse(hbResponse);
if (hbResponse.getStatus() != HbStatus.OK) {
// invalid all connections cached in ClientPool
ClientPool.brokerPool.clearPool(new TNetworkAddress(broker.ip, broker.port));
Expand Down Expand Up @@ -276,7 +277,8 @@ public HeartbeatResponse call() {
// heartbeat to self
if (Catalog.getInstance().isReady()) {
return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port,
Catalog.getInstance().getReplayedJournalId(), System.currentTimeMillis());
Catalog.getInstance().getReplayedJournalId(), System.currentTimeMillis(),
Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
} else {
return new FrontendHbResponse(fe.getNodeName(), "not ready");
}
Expand All @@ -299,8 +301,9 @@ public HeartbeatResponse call() {
long replayedJournalId = root.getLong(BootstrapFinishAction.REPLAYED_JOURNAL_ID);
int queryPort = root.getInt(BootstrapFinishAction.QUERY_PORT);
int rpcPort = root.getInt(BootstrapFinishAction.RPC_PORT);
String version = root.getString(BootstrapFinishAction.VERSION);
return new FrontendHbResponse(fe.getNodeName(), queryPort, rpcPort, replayedJournalId,
System.currentTimeMillis());
System.currentTimeMillis(), version == null ? "unknown" : version);
}
} catch (Exception e) {
return new FrontendHbResponse(fe.getNodeName(),
Expand Down
2 changes: 2 additions & 0 deletions gensrc/script/gen_build_version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,15 @@ public class Version {

public static final String DORIS_BUILD_VERSION = "${build_version}";
public static final String DORIS_BUILD_HASH = "${build_hash}";
public static final String DORIS_BUILD_SHORT_HASH = "${build_short_hash}";
public static final String DORIS_BUILD_TIME = "${build_time}";
public static final String DORIS_BUILD_INFO = "${build_info}";
public static final String DORIS_JAVA_COMPILE_VERSION = "${java_version_str}";

public static void main(String[] args) {
System.out.println("doris_build_version: " + DORIS_BUILD_VERSION);
System.out.println("doris_build_hash: " + DORIS_BUILD_HASH);
System.out.println("doris_build_short_hash: " + DORIS_BUILD_SHORT_HASH);
System.out.println("doris_build_time: " + DORIS_BUILD_TIME);
System.out.println("doris_build_info: " + DORIS_BUILD_INFO);
System.out.println("doris_java_compile_version: " + DORIS_JAVA_COMPILE_VERSION);
Expand Down