Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions fe/src/com/baidu/palo/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import com.baidu.palo.load.LoadJob.JobState;
import com.baidu.palo.master.Checkpoint;
import com.baidu.palo.master.MetaHelper;
import com.baidu.palo.persist.BackendIdsUpdateInfo;
import com.baidu.palo.persist.ClusterInfo;
import com.baidu.palo.persist.DatabaseInfo;
import com.baidu.palo.persist.DropInfo;
Expand All @@ -129,16 +130,15 @@
import com.baidu.palo.persist.Storage;
import com.baidu.palo.persist.StorageInfo;
import com.baidu.palo.persist.TableInfo;
import com.baidu.palo.persist.BackendIdsUpdateInfo;
import com.baidu.palo.qe.ConnectContext;
import com.baidu.palo.qe.JournalObservable;
import com.baidu.palo.qe.SessionVariable;
import com.baidu.palo.qe.VariableMgr;
import com.baidu.palo.service.FrontendOptions;
import com.baidu.palo.system.Backend;
import com.baidu.palo.system.Backend.BackendState;
import com.baidu.palo.system.Frontend;
import com.baidu.palo.system.SystemInfoService;
import com.baidu.palo.system.Backend.BackendState;
import com.baidu.palo.task.AgentBatchTask;
import com.baidu.palo.task.AgentTask;
import com.baidu.palo.task.AgentTaskExecutor;
Expand Down Expand Up @@ -181,12 +181,12 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Iterator;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -738,11 +738,7 @@ private void transferToMaster() throws IOException {
// catalog recycle bin
getRecycleBin().start();

if (!Config.master_ip.equals("0.0.0.0")) {
this.masterIp = Config.master_ip;
} else {
this.masterIp = FrontendOptions.getLocalHostAddress();
}
this.masterIp = FrontendOptions.getLocalHostAddress();
this.masterRpcPort = Config.rpc_port;
this.masterHttpPort = Config.http_port;

Expand Down
7 changes: 4 additions & 3 deletions fe/src/com/baidu/palo/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@ public class Config extends ConfigBase {
@ConfField public static String replica_ack_policy = "SIMPLE_MAJORITY"; // ALL, NONE, SIMPLE_MAJORITY

/*
* Specified a ip for frontend, instead of ip get by *InetAddress.getByName*.
* This can be used when *InetAddress.getByName* get a unexpected ip address.
* Specified an IP for frontend, instead of the ip get by *InetAddress.getByName*.
* This can be used when *InetAddress.getByName* get an unexpected IP address.
* Default is "0.0.0.0", which means not set.
* CAN NOT set this as a hostname, only IP.
*/
@ConfField public static String master_ip = "0.0.0.0";
@ConfField public static String frontend_address = "0.0.0.0";

/*
* Kudu is currently not supported.
Expand Down
6 changes: 3 additions & 3 deletions fe/src/com/baidu/palo/http/rest/RestBaseAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;

import java.net.URI;
import java.net.URISyntaxException;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;

public class RestBaseAction extends BaseAction {
private static final Logger LOG = LogManager.getLogger(RestBaseAction.class);

Expand Down
34 changes: 18 additions & 16 deletions fe/src/com/baidu/palo/journal/bdbje/BDBJournalCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@

package com.baidu.palo.journal.bdbje;

import com.baidu.palo.journal.JournalCursor;
import com.baidu.palo.journal.JournalEntity;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import com.baidu.palo.journal.JournalCursor;
import com.baidu.palo.journal.JournalEntity;

import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.util.List;

public class BDBJournalCursor implements JournalCursor {
Expand Down Expand Up @@ -102,8 +103,9 @@ public JournalEntity next() {
// null means perform the operation without transaction protection.
// READ_COMMITTED guarantees no dirty read.
int tryTimes = 0;
while (true) {
if (database.get(null, theKey, theData, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS) {
while (true) {
OperationStatus operationStatus = database.get(null, theKey, theData, LockMode.READ_COMMITTED);
if (operationStatus == OperationStatus.SUCCESS) {
// Recreate the data String.
byte[] retData = theData.getData();
DataInputStream in = new DataInputStream(new ByteArrayInputStream(retData));
Expand All @@ -123,7 +125,7 @@ public JournalEntity next() {
continue;
} else if (tryTimes < maxTryTime) {
tryTimes++;
LOG.warn("fail to get journal {}, will try again", currentKey);
LOG.warn("fail to get journal {}, will try again. status: {}", currentKey, operationStatus);
Thread.sleep(3000);
continue;
} else {
Expand Down
29 changes: 21 additions & 8 deletions fe/src/com/baidu/palo/service/FrontendOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,35 @@

package com.baidu.palo.service;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.util.List;
import java.util.ArrayList;
import com.baidu.palo.common.Config;
import com.baidu.palo.common.util.NetUtils;

import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.baidu.palo.common.util.NetUtils;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;


public class FrontendOptions {
private static final Logger LOG = LogManager.getLogger(FrontendOptions.class);

public static void init() {
private static InetAddress localHost;

public static void init() throws UnknownHostException {
if (!Config.frontend_address.equals("0.0.0.0")) {
if (!InetAddressValidator.getInstance().isValidInet4Address(Config.frontend_address)) {
throw new UnknownHostException("invalid frontend_address: " + Config.frontend_address);
}
localHost = InetAddress.getByName(Config.frontend_address);
return;
}

// if not set frontend_address, get a non-loopback ip
List<InetAddress> hosts = new ArrayList<InetAddress>();
NetUtils.getHosts(hosts);
if (hosts.isEmpty()) {
Expand All @@ -49,6 +63,7 @@ public static void init() {
}
}

// nothing found, use loopback addr
if (localHost == null) {
localHost = loopBack;
}
Expand All @@ -61,7 +76,5 @@ public static InetAddress getLocalHost() {
public static String getLocalHostAddress() {
return localHost.getHostAddress();
}

private static InetAddress localHost;
};

89 changes: 45 additions & 44 deletions fe/src/com/baidu/palo/system/SystemInfoService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,56 @@

package com.baidu.palo.system;

import java.util.Collections;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import com.baidu.palo.catalog.Catalog;
import com.baidu.palo.catalog.Database;
import com.baidu.palo.cluster.Cluster;
import com.baidu.palo.common.AnalysisException;
import com.baidu.palo.common.ClientPool;
import com.baidu.palo.common.DdlException;
import com.baidu.palo.common.FeConstants;
import com.baidu.palo.common.FeMetaVersion;
import com.baidu.palo.common.Pair;
import com.baidu.palo.common.util.Daemon;
import com.baidu.palo.system.Backend.BackendState;
import com.baidu.palo.system.BackendEvent.BackendEventType;
import com.baidu.palo.thrift.HeartbeatService;
import com.baidu.palo.thrift.TBackendInfo;
import com.baidu.palo.thrift.THeartbeatResult;
import com.baidu.palo.thrift.TMasterInfo;
import com.baidu.palo.thrift.TNetworkAddress;
import com.baidu.palo.thrift.TStatusCode;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;

import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.baidu.palo.catalog.Catalog;
import com.baidu.palo.catalog.Database;
import com.baidu.palo.cluster.Cluster;
import com.baidu.palo.common.AnalysisException;
import com.baidu.palo.common.ClientPool;
import com.baidu.palo.common.DdlException;
import com.baidu.palo.common.FeConstants;
import com.baidu.palo.common.FeMetaVersion;
import com.baidu.palo.common.Pair;
import com.baidu.palo.common.util.Daemon;
import com.baidu.palo.system.Backend.BackendState;
import com.baidu.palo.system.BackendEvent.BackendEventType;
import com.baidu.palo.thrift.HeartbeatService;
import com.baidu.palo.thrift.TBackendInfo;
import com.baidu.palo.thrift.THeartbeatResult;
import com.baidu.palo.thrift.TMasterInfo;
import com.baidu.palo.thrift.TNetworkAddress;
import com.baidu.palo.thrift.TStatusCode;
import com.google.common.base.Strings;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;

public class SystemInfoService extends Daemon {
public static final String DEFAULT_CLUSTER = "default_cluster";
private static final Logger LOG = LogManager.getLogger(SystemInfoService.class);
Expand Down