Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ public class Config extends ConfigBase {
* When enable_fqdn_mode is true, the name of the pod where be is located will remain unchanged
* after reconstruction, while the ip can be changed.
*/
@ConfField(mutable = false, masterOnly = true, expType = ExperimentalType.EXPERIMENTAL)
@ConfField(mutable = false, expType = ExperimentalType.EXPERIMENTAL)
public static boolean enable_fqdn_mode = false;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : BrokerMgr.BROKER_PROC_NODE_TITLE_NAMES) {
if (title.equals(BrokerMgr.BROKER_PROC_NODE_TITLE_NAMES.get(BrokerMgr.HOSTNAME_INDEX))) {
// SHOW BROKER does not show hostname
continue;
}
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();
Expand Down
14 changes: 6 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcNodeInterface;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;

import com.google.common.collect.ArrayListMultimap;
Expand All @@ -50,7 +49,7 @@
*/
public class BrokerMgr {
public static final ImmutableList<String> BROKER_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Name").add("IP").add("HostName").add("Port").add("Alive")
.add("Name").add("Host").add("Port").add("Alive")
.add("LastStartTime").add("LastUpdateTime").add("ErrMsg")
.build();

Expand Down Expand Up @@ -228,7 +227,7 @@ public void addBrokers(String name, Collection<Pair<String, Integer>> addresses)
}
Env.getCurrentEnv().getEditLog().logAddBroker(new ModifyBrokerInfo(name, addedBrokerAddress));
for (FsBroker address : addedBrokerAddress) {
brokerAddrsMap.put(address.ip, address);
brokerAddrsMap.put(address.host, address);
}
brokersMap.put(name, brokerAddrsMap);
brokerListMap.put(name, Lists.newArrayList(brokerAddrsMap.values()));
Expand All @@ -246,7 +245,7 @@ public void replayAddBrokers(String name, List<FsBroker> addresses) {
brokersMap.put(name, brokerAddrsMap);
}
for (FsBroker address : addresses) {
brokerAddrsMap.put(address.ip, address);
brokerAddrsMap.put(address.host, address);
}

brokerListMap.put(name, Lists.newArrayList(brokerAddrsMap.values()));
Expand Down Expand Up @@ -280,7 +279,7 @@ public void dropBrokers(String name, Collection<Pair<String, Integer>> addresses
}
Env.getCurrentEnv().getEditLog().logDropBroker(new ModifyBrokerInfo(name, droppedAddressList));
for (FsBroker address : droppedAddressList) {
brokerAddrsMap.remove(address.ip, address);
brokerAddrsMap.remove(address.host, address);
}

brokerListMap.put(name, Lists.newArrayList(brokerAddrsMap.values()));
Expand All @@ -294,7 +293,7 @@ public void replayDropBrokers(String name, List<FsBroker> addresses) {
try {
ArrayListMultimap<String, FsBroker> brokerAddrsMap = brokersMap.get(name);
for (FsBroker addr : addresses) {
brokerAddrsMap.remove(addr.ip, addr);
brokerAddrsMap.remove(addr.host, addr);
}

brokerListMap.put(name, Lists.newArrayList(brokerAddrsMap.values()));
Expand Down Expand Up @@ -364,8 +363,7 @@ public ProcResult fetchResult() {
for (FsBroker broker : entry.getValue().values()) {
List<String> row = Lists.newArrayList();
row.add(brokerName);
row.add(broker.ip);
row.add(NetUtils.getHostnameByIp(broker.ip));
row.add(broker.host);
row.add(String.valueOf(broker.port));
row.add(String.valueOf(broker.isAlive));
row.add(TimeUtils.longToTimeString(broker.lastStartTime));
Expand Down
18 changes: 9 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.io.IOException;

public class FsBroker implements Writable, Comparable<FsBroker> {
@SerializedName(value = "ip")
public String ip;
@SerializedName(value = "host", alternate = {"ip"})
public String host;
@SerializedName(value = "port")
public int port;
// msg for ping result
Expand All @@ -46,8 +46,8 @@ public class FsBroker implements Writable, Comparable<FsBroker> {
public FsBroker() {
}

public FsBroker(String ip, int port) {
this.ip = ip;
public FsBroker(String host, int port) {
this.host = host;
this.port = port;
}

Expand Down Expand Up @@ -90,19 +90,19 @@ public boolean equals(Object o) {
FsBroker other = (FsBroker) o;

return port == other.port
&& ip.equals(other.ip);
&& host.equals(other.host);
}

@Override
public int hashCode() {
int result = ip.hashCode();
int result = host.hashCode();
result = 31 * result + port;
return result;
}

@Override
public int compareTo(FsBroker o) {
int ret = ip.compareTo(o.ip);
int ret = host.compareTo(o.host);
if (ret != 0) {
return ret;
}
Expand All @@ -117,13 +117,13 @@ public void write(DataOutput out) throws IOException {

@Deprecated
private void readFields(DataInput in) throws IOException {
ip = Text.readString(in);
host = Text.readString(in);
port = in.readInt();
}

@Override
public String toString() {
return ip + ":" + port;
return host + ":" + port;
}

public static FsBroker readIn(DataInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ public static TNetworkAddress getAddress(BrokerDesc brokerDesc) throws UserExcep
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
return new TNetworkAddress(broker.ip, broker.port);
return new TNetworkAddress(broker.host, broker.port);
}

public static TPaloBrokerService.Client borrowClient(TNetworkAddress address) throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ private HostInfo convertToHostInfo(Frontend frontend) {
}

private HostInfo convertToHostInfo(FsBroker broker) {
return new HostInfo(broker.ip, broker.port);
return new HostInfo(broker.host, broker.port);
}

private HostInfo convertToHostInfo(Backend backend) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Pair<TPaloBrokerService.Client, TNetworkAddress> getBroker() {
LOG.warn("failed to get a broker address: " + e.getMessage());
return null;
}
TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port);
TNetworkAddress address = new TNetworkAddress(broker.host, broker.port);
TPaloBrokerService.Client client;
try {
client = ClientPool.brokerPool.borrowObject(address);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,11 @@ private Set<Long> submitPushTasks() throws UserException {
FsBroker fsBroker = Env.getCurrentEnv().getBrokerMgr().getBroker(
brokerDesc.getName(), backend.getHost());
tBrokerScanRange.getBrokerAddresses().add(
new TNetworkAddress(fsBroker.ip, fsBroker.port));
new TNetworkAddress(fsBroker.host, fsBroker.port));

LOG.debug("push task for replica {}, broker {}:{},"
+ " backendId {}, filePath {}, fileSize {}",
replicaId, fsBroker.ip,
replicaId, fsBroker.host,
fsBroker.port, backendId, tBrokerRangeDesc.path,
tBrokerRangeDesc.file_size);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected TDataSink toThrift() {
if (brokerDesc.getFileType() == TFileType.FILE_BROKER) {
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerDesc.getName());
if (broker != null) {
tExportSink.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
tExportSink.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
}
}
tExportSink.setProperties(brokerDesc.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ protected TScanRangeLocations newLocations(TFileScanRangeParams params, BrokerDe
} catch (AnalysisException e) {
throw new UserException(e.getMessage());
}
params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
} else {
params.setBrokerAddresses(new ArrayList<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void createScanRangeLocations() throws UserException {
if (broker == null) {
throw new UserException("No alive broker.");
}
params.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
}
} else if (locationType == TFileType.FILE_S3) {
params.setProperties(locationProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ public void exec() throws Exception {
ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
.getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname());
topResultFileSink.setBrokerAddr(broker.ip, broker.port);
topResultFileSink.setBrokerAddr(broker.host, broker.port);
}
} else {
// This is a load process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@
import org.apache.doris.backup.Repository;
import org.apache.doris.backup.RestoreJob;
import org.apache.doris.blockrule.SqlBlockRule;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
Expand Down Expand Up @@ -1754,9 +1753,6 @@ private void handleShowTablet() throws AnalysisException {
private void handleShowBroker() {
ShowBrokerStmt showStmt = (ShowBrokerStmt) stmt;
List<List<String>> brokersInfo = Env.getCurrentEnv().getBrokerMgr().getBrokersInfo();
for (List<String> row : brokersInfo) {
row.remove(BrokerMgr.HOSTNAME_INDEX);
}

// Only success
resultSet = new ShowResultSet(showStmt.getMetaData(), brokersInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private boolean handleHbResponse(HeartbeatResponse response, boolean isReplay) {
boolean isChanged = broker.handleHbResponse(hbResponse);
if (hbResponse.getStatus() != HbStatus.OK) {
// invalid all connections cached in ClientPool
ClientPool.brokerPool.clearPool(new TNetworkAddress(broker.ip, broker.port));
ClientPool.brokerPool.clearPool(new TNetworkAddress(broker.host, broker.port));
}
return isChanged;
}
Expand Down Expand Up @@ -352,7 +352,7 @@ public BrokerHeartbeatHandler(String brokerName, FsBroker broker, String clientI
@Override
public HeartbeatResponse call() {
TPaloBrokerService.Client client = null;
TNetworkAddress addr = new TNetworkAddress(broker.ip, broker.port);
TNetworkAddress addr = new TNetworkAddress(broker.host, broker.port);
boolean ok = false;
try {
client = ClientPool.brokerPool.borrowObject(addr);
Expand All @@ -362,13 +362,13 @@ public HeartbeatResponse call() {
ok = true;

if (status.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new BrokerHbResponse(brokerName, broker.ip, broker.port, status.getMessage());
return new BrokerHbResponse(brokerName, broker.host, broker.port, status.getMessage());
} else {
return new BrokerHbResponse(brokerName, broker.ip, broker.port, System.currentTimeMillis());
return new BrokerHbResponse(brokerName, broker.host, broker.port, System.currentTimeMillis());
}

} catch (Exception e) {
return new BrokerHbResponse(brokerName, broker.ip, broker.port,
return new BrokerHbResponse(brokerName, broker.host, broker.port,
Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
} finally {
if (ok) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public Map<String, String> getBrokerProperties() {
}

public TDownloadReq toThrift() {
TNetworkAddress address = new TNetworkAddress(brokerAddr.ip, brokerAddr.port);
TNetworkAddress address = new TNetworkAddress(brokerAddr.host, brokerAddr.port);
TDownloadReq req = new TDownloadReq(jobId, srcToDestPath, address);
req.setBrokerProp(brokerProperties);
req.setStorageBackend(storageType.toThrift());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Map<String, String> getBrokerProperties() {
}

public TUploadReq toThrift() {
TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port);
TNetworkAddress address = new TNetworkAddress(broker.host, broker.port);
TUploadReq request = new TUploadReq(jobId, srcToDestPath, address);
request.setBrokerProp(brokerProperties);
request.setStorageBackend(storageType.toThrift());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testHeartbeatOk() throws Exception {
DataInputStream dis = new DataInputStream(new FileInputStream(file));

FsBroker readBroker = FsBroker.readIn(dis);
Assert.assertEquals(fsBroker.ip, readBroker.ip);
Assert.assertEquals(fsBroker.host, readBroker.host);
Assert.assertEquals(fsBroker.port, readBroker.port);
Assert.assertEquals(fsBroker.isAlive, readBroker.isAlive);
Assert.assertTrue(fsBroker.isAlive);
Expand All @@ -98,7 +98,7 @@ public void testHeartbeatFailed() throws Exception {
DataInputStream dis = new DataInputStream(new FileInputStream(file));

FsBroker readBroker = FsBroker.readIn(dis);
Assert.assertEquals(fsBroker.ip, readBroker.ip);
Assert.assertEquals(fsBroker.host, readBroker.host);
Assert.assertEquals(fsBroker.port, readBroker.port);
Assert.assertEquals(fsBroker.isAlive, readBroker.isAlive);
Assert.assertFalse(fsBroker.isAlive);
Expand Down