From 59209d6e9b29a0742e6b18174e26d92b9a2ae408 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Wed, 28 Apr 2021 14:35:09 +0800 Subject: [PATCH 1/8] Fix the replay error in stream load manager The previous replay logic does not record the size of the map, which eventually resulted in EOF when reading the log. This pr replaces the replay logic directly with json. At the same time, the replay logic of image is supplemented. The pr ensure that the attributes 'lastStreamLoadTime' of backend can be correctly recorded in the image. Fixed #5729 --- .../apache/doris/common/FeMetaVersion.java | 4 +- .../doris/load/StreamLoadRecordMgr.java | 51 +++++++------------ .../java/org/apache/doris/system/Backend.java | 7 ++- 3 files changed, 27 insertions(+), 35 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 5f4d443fe41145..748df5d6ff5d23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -208,6 +208,8 @@ public final class FeMetaVersion { public static final int VERSION_97 = 97; // add list partition public static final int VERSION_98 = 98; + // add audit steam load + public static final int VERSION_99 = 99; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_98; + public static final int VERSION_CURRENT = VERSION_99; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java index 9623a0e1dad331..ea7a7e3053bfe0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -17,8 +17,6 @@ package org.apache.doris.load; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; import org.apache.doris.analysis.ShowStreamLoadStmt.StreamLoadState; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -26,9 +24,11 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.plugin.AuditEvent; import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.plugin.StreamLoadAuditEvent; @@ -38,11 +38,14 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStreamLoadRecord; import org.apache.doris.thrift.TStreamLoadRecordResult; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import com.google.common.collect.Maps; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; @@ -53,12 +56,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.PriorityQueue; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import com.google.gson.annotations.SerializedName; + public class StreamLoadRecordMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class); @@ -313,13 +318,14 @@ public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRec for (Backend backend : backends.values()) { if (beIdToLastStreamLoad.containsKey(backend.getId())) { long lastStreamLoadTime = beIdToLastStreamLoad.get(backend.getId()); - LOG.info("Replay stream load bdbje. backend: {}, last stream load version: {}", backend.getHost(), lastStreamLoadTime); + LOG.info("Replay stream load bdbje. backend: {}, last stream load time: {}", backend.getHost(), lastStreamLoadTime); backend.setLastStreamLoadTime(lastStreamLoadTime); } } } public static class FetchStreamLoadRecord implements Writable { + @SerializedName("beIdToLastStreamLoad") private Map beIdToLastStreamLoad; public FetchStreamLoadRecord(Map beIdToLastStreamLoad) { @@ -336,34 +342,13 @@ public Map getBeIdToLastStreamLoad() { @Override public void write(DataOutput out) throws IOException { - for (Map.Entry entry : beIdToLastStreamLoad.entrySet()) { - out.writeBoolean(true); - out.writeLong(entry.getKey()); - out.writeBoolean(true); - out.writeLong(entry.getValue()); - LOG.debug("Write stream load bdbje. key: {}, value: {} ", entry.getKey(), entry.getValue()); - } + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } public static FetchStreamLoadRecord read(DataInput in) throws IOException { - Map idToLastStreamLoad = Maps.newHashMap(); - int beNum = Catalog.getCurrentSystemInfo().getIdToBackend().size(); - for (int i = 0; i < beNum; i++) { - long beId = -1; - long lastStreamLoad = -1; - if (in.readBoolean()) { - beId = in.readLong(); - } - if (in.readBoolean()) { - lastStreamLoad = in.readLong(); - } - if (beId != -1 && lastStreamLoad != -1) { - idToLastStreamLoad.put(beId, lastStreamLoad); - } - LOG.debug("Read stream load bdbje. key: {}, value: {} ", beId, lastStreamLoad); - } - FetchStreamLoadRecord fetchStreamLoadRecord = new FetchStreamLoadRecord(idToLastStreamLoad); - return fetchStreamLoadRecord; + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, FetchStreamLoadRecord.class); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index b64e030f740949..6a68cd22032f75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -26,12 +26,12 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.system.HeartbeatResponse.HbStatus; import org.apache.doris.thrift.TDisk; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.doris.thrift.TStorageMedium; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -517,6 +517,7 @@ public void write(DataOutput out) throws IOException { out.writeInt(decommissionType); out.writeInt(brpcPort); + out.writeLong(lastStreamLoadTime); } public void readFields(DataInput in) throws IOException { @@ -562,6 +563,10 @@ public void readFields(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_40) { brpcPort = in.readInt(); } + + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_99) { + lastStreamLoadTime = in.readLong(); + } } @Override From 7d0b9962c743f9ed90c85044dfe92275d436a70d Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 29 Apr 2021 11:11:06 +0800 Subject: [PATCH 2/8] Add config enable_stream_load_record False: disable stream load record The default is disable. --- be/src/common/config.h | 3 +++ be/src/http/action/stream_load.cpp | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 023c48edb0e573..3c0a408413a6fd 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -355,6 +355,9 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200"); CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. CONF_mBool(tablet_writer_ignore_eovercrowded, "false"); +// Whether to enable stream load record function, the default is false. +// False: disable stream load record +CONF_mBool(enable_stream_load_record, "false"); // batch size of stream load record reported to FE CONF_mInt32(stream_load_record_batch_size, "50"); // expire time of stream load record in rocksdb. diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index ca569bfc94b67d..6eb64f2d876d2f 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -163,7 +163,9 @@ void StreamLoadAction::handle(HttpRequest* req) { HttpChannel::send_reply(req, str); #ifndef BE_TEST str = ctx->prepare_stream_load_record(str); - _sava_stream_load_record(ctx, str); + if (config::enable_stream_load_record) { + _sava_stream_load_record(ctx, str); + } #endif // update statstics streaming_load_requests_total->increment(1); From 57f51016a3c0d298159c23931caf655ffb4d72ec Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 29 Apr 2021 11:14:41 +0800 Subject: [PATCH 3/8] debug --- .../org/apache/doris/load/StreamLoadRecordMgr.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java index ea7a7e3053bfe0..fb713d68b8b9c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -236,12 +236,14 @@ protected void runAfterCatalogReady() { TStreamLoadRecord streamLoadItem= entry.getValue(); String startTime = TimeUtils.longToTimeString(streamLoadItem.getStartTime(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); String finishTime = TimeUtils.longToTimeString(streamLoadItem.getFinishTime(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); - LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," + - " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," + - " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.", - backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(), - streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(), - streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), startTime, finishTime); + if (LOG.isDebugEnabled()) { + LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," + + " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," + + " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.", + backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(), + streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(), + streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), startTime, finishTime); + } AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH) .setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl()) From c7b24b0e7567f4c9c6a93d887d35f29db3ed0fe6 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Fri, 30 Apr 2021 12:25:59 +0800 Subject: [PATCH 4/8] Fix --- .../org/apache/doris/catalog/DiskInfo.java | 27 +++-- .../apache/doris/common/FeMetaVersion.java | 2 +- .../apache/doris/persist/gson/GsonUtils.java | 102 ++++++++++++++++-- .../java/org/apache/doris/system/Backend.java | 74 ++++++------- .../org/apache/doris/catalog/BackendTest.java | 39 +++++-- 5 files changed, 175 insertions(+), 69 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java index 4a7f458551800d..284f332912cf5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -21,6 +21,8 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.system.Backend; import org.apache.doris.thrift.TStorageMedium; import org.apache.logging.log4j.LogManager; @@ -30,6 +32,8 @@ import java.io.DataOutput; import java.io.IOException; +import com.google.gson.annotations.SerializedName; + public class DiskInfo implements Writable { private static final Logger LOG = LogManager.getLogger(DiskInfo.class); @@ -40,10 +44,15 @@ public enum DiskState { private static final long DEFAULT_CAPACITY_B = 1024 * 1024 * 1024 * 1024L; // 1T + @SerializedName("rootPath") private String rootPath; + @SerializedName("totalCapacityB") private long totalCapacityB; + @SerializedName("dataUsedCapacityB") private long dataUsedCapacityB; + @SerializedName("diskAvailableCapacityB") private long diskAvailableCapacityB; + @SerializedName("state") private DiskState state; // path hash and storage medium are reported from Backend and no need to persist @@ -155,11 +164,8 @@ public String toString() { @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, rootPath); - out.writeLong(totalCapacityB); - out.writeLong(dataUsedCapacityB); - out.writeLong(diskAvailableCapacityB); - Text.writeString(out, state.name()); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } public void readFields(DataInput in) throws IOException { @@ -177,8 +183,13 @@ public void readFields(DataInput in) throws IOException { } public static DiskInfo read(DataInput in) throws IOException { - DiskInfo diskInfo = new DiskInfo(); - diskInfo.readFields(in); - return diskInfo; + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_99) { + DiskInfo diskInfo = new DiskInfo(); + diskInfo.readFields(in); + return diskInfo; + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, DiskInfo.class); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 748df5d6ff5d23..b26f372cfb8a59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -208,7 +208,7 @@ public final class FeMetaVersion { public static final int VERSION_97 = 97; // add list partition public static final int VERSION_98 = 98; - // add audit steam load + // add audit steam load and change the serialization backend method to json public static final int VERSION_99 = 99; // note: when increment meta version, should assign the latest version to VERSION_CURRENT public static final int VERSION_CURRENT = VERSION_99; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 575385b413150c..001955f95f7f98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -21,8 +21,8 @@ import org.apache.doris.alter.RollupJobV2; import org.apache.doris.alter.SchemaChangeJobV2; import org.apache.doris.catalog.DistributionInfo; -import org.apache.doris.catalog.OdbcCatalogResource; import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.OdbcCatalogResource; import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.ScalarType; @@ -34,10 +34,22 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Table; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + import com.google.gson.ExclusionStrategy; import com.google.gson.FieldAttributes; import com.google.gson.Gson; @@ -57,14 +69,6 @@ import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; -import java.io.IOException; -import java.lang.reflect.Method; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - /* * Some utilities about Gson. * User should get GSON instance from this class to do the serialization. @@ -124,7 +128,9 @@ public class GsonUtils { .registerTypeAdapterFactory(distributionInfoTypeAdapterFactory) .registerTypeAdapterFactory(resourceTypeAdapterFactory) .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory) - .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory); + .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory) + .registerTypeAdapterFactory(new ImmutableMapTypeAdapterFactory()) + .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()); // this instance is thread-safe. public static final Gson GSON = GSON_BUILDER.create(); @@ -326,6 +332,82 @@ public Multimap deserialize(JsonElement json, Type typeOfT, JsonDeserializ } } + private static class AtomicBooleanAdapter + implements JsonSerializer, JsonDeserializer { + + @Override + public AtomicBoolean deserialize(JsonElement jsonElement, Type type, + JsonDeserializationContext jsonDeserializationContext) + throws JsonParseException { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + boolean value = jsonObject.get("boolean").getAsBoolean(); + return new AtomicBoolean(value); + } + + @Override + public JsonElement serialize(AtomicBoolean atomicBoolean, Type type, + JsonSerializationContext jsonSerializationContext) { + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("boolean", atomicBoolean.get()); + return jsonObject; + } + } + + private static class ImmutableMapAdapter implements JsonSerializer>, + JsonDeserializer> { + + @Override + public ImmutableMap deserialize(JsonElement jsonElement, Type type, + JsonDeserializationContext jsonDeserializationContext) + throws JsonParseException { + ImmutableMap.Builder resultBuilder = ImmutableMap.builder(); + JsonObject jsonObject = jsonElement.getAsJsonObject(); + JsonElement mapElement = jsonObject.get("map"); + Map asMap = jsonDeserializationContext.deserialize(mapElement, HashMap.class); + for (Map.Entry entry : asMap.entrySet()) { + resultBuilder.put(entry.getKey(), entry.getValue()); + } + return resultBuilder.build(); + } + + @Override + public JsonElement serialize(ImmutableMap kvImmutableMap, Type typeOfSrc, + JsonSerializationContext jsonSerializationContext) { + JsonObject jsonObject = new JsonObject(); + Map asMap = Maps.newHashMap(); + for (Map.Entry entry: kvImmutableMap.entrySet()) { + asMap.put(entry.getKey(), entry.getValue()); + } + + JsonElement jsonElement = jsonSerializationContext.serialize(asMap, HashMap.class); + jsonObject.add("map", jsonElement); + return jsonObject; + } + } + + public static class ImmutableMapTypeAdapterFactory implements TypeAdapterFactory { + + @Override + public TypeAdapter create(Gson gson, TypeToken type) { + if (!ImmutableMap.class.isAssignableFrom(type.getRawType())) { + return null; + } + final TypeAdapter delegate = gson.getDelegateAdapter(this, type); + return new TypeAdapter() { + @Override + public void write(JsonWriter out, T value) throws IOException { + delegate.write(out, value); + } + + @Override + @SuppressWarnings("unchecked") + public T read(JsonReader in) throws IOException { + return (T) ImmutableMap.copyOf((Map) delegate.read(in)); + } + }; + } + } + public static class PostProcessTypeAdapterFactory implements TypeAdapterFactory { public PostProcessTypeAdapterFactory() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 6a68cd22032f75..963843d888e566 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -24,6 +24,8 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.load.StreamLoadRecordMgr; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.system.HeartbeatResponse.HbStatus; import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TStorageMedium; @@ -43,6 +45,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.gson.annotations.SerializedName; + /** * This class extends the primary identifier of a Backend with ephemeral state, * eg usage information, current administrative state etc. @@ -57,28 +61,43 @@ public enum BackendState { private static final Logger LOG = LogManager.getLogger(Backend.class); + @SerializedName("id") private long id; + @SerializedName("host") private String host; private String version; + @SerializedName("heartbeatPort") private int heartbeatPort; // heartbeat + @SerializedName("bePort") private volatile int bePort; // be + @SerializedName("httpPort") private volatile int httpPort; // web service + @SerializedName("beRpcPort") private volatile int beRpcPort; // be rpc port + @SerializedName("brpcPort") private volatile int brpcPort = -1; + @SerializedName("lastUpdateMs") private volatile long lastUpdateMs; + @SerializedName("lastStartTime") private volatile long lastStartTime; + @SerializedName("isAlive") private AtomicBoolean isAlive; + @SerializedName("isDecommissioned") private AtomicBoolean isDecommissioned; + @SerializedName("decommissionType") private volatile int decommissionType; + @SerializedName("ownerClusterName") private volatile String ownerClusterName; // to index the state in some cluster + @SerializedName("backendState") private volatile int backendState; // private BackendState backendState; // rootPath -> DiskInfo + @SerializedName("disksRef") private volatile ImmutableMap disksRef; private String heartbeatErrMsg = ""; @@ -93,10 +112,9 @@ public enum BackendState { private volatile long tabletMaxCompactionScore = 0; // additional backendStatus information for BE, display in JSON format + @SerializedName("backendStatus") private BackendStatus backendStatus = new BackendStatus(); - private long lastStreamLoadTime = -1; - public Backend() { this.host = ""; this.version = ""; @@ -114,8 +132,6 @@ public Backend() { this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); - - this.lastStreamLoadTime = -1; } public Backend(long id, String host, int heartbeatPort) { @@ -136,8 +152,6 @@ public Backend(long id, String host, int heartbeatPort) { this.ownerClusterName = ""; this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); - - this.lastStreamLoadTime = -1; } public long getId() { @@ -176,10 +190,10 @@ public String getHeartbeatErrMsg() { return heartbeatErrMsg; } - public long getLastStreamLoadTime() { return lastStreamLoadTime; } + public long getLastStreamLoadTime() { return this.backendStatus.lastStreamLoadTime; } public void setLastStreamLoadTime(long lastStreamLoadTime) { - this.lastStreamLoadTime = lastStreamLoadTime; + this.backendStatus.lastStreamLoadTime = lastStreamLoadTime; } // for test only @@ -487,37 +501,19 @@ public void updateDisks(Map backendDisks) { } public static Backend read(DataInput in) throws IOException { - Backend backend = new Backend(); - backend.readFields(in); - return backend; + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_99) { + Backend backend = new Backend(); + backend.readFields(in); + return backend; + } + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, Backend.class); } @Override public void write(DataOutput out) throws IOException { - out.writeLong(id); - Text.writeString(out, host); - out.writeInt(heartbeatPort); - out.writeInt(bePort); - out.writeInt(httpPort); - out.writeInt(beRpcPort); - out.writeBoolean(isAlive.get()); - out.writeBoolean(isDecommissioned.get()); - out.writeLong(lastUpdateMs); - out.writeLong(lastStartTime); - - ImmutableMap disks = disksRef; - out.writeInt(disks.size()); - for (Map.Entry entry : disks.entrySet()) { - Text.writeString(out, entry.getKey()); - entry.getValue().write(out); - } - - Text.writeString(out, ownerClusterName); - out.writeInt(backendState); - out.writeInt(decommissionType); - - out.writeInt(brpcPort); - out.writeLong(lastStreamLoadTime); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } public void readFields(DataInput in) throws IOException { @@ -563,10 +559,6 @@ public void readFields(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_40) { brpcPort = in.readInt(); } - - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_99) { - lastStreamLoadTime = in.readLong(); - } } @Override @@ -701,6 +693,10 @@ private int getDiskNum() { public class BackendStatus { // this will be output as json, so not using FeConstants.null_string; public String lastSuccessReportTabletsTime = "N/A"; + @SerializedName("lastStreamLoadTime") + // the last time when the stream load status was reported by backend + private long lastStreamLoadTime = -1; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java index 2168f89ea08bca..285136c3c534c7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java @@ -22,6 +22,8 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TDisk; +import com.google.common.collect.ImmutableMap; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -116,7 +118,12 @@ public void testSerialization() throws Exception { List list1 = new LinkedList(); List list2 = new LinkedList(); - + + DiskInfo diskInfo1 = new DiskInfo("/disk1"); + DiskInfo diskInfo2 = new DiskInfo("/disk2"); + ImmutableMap diskRefs = ImmutableMap.of( + "disk1", diskInfo1, + "disk2", diskInfo2); for (int count = 0; count < 100; ++count) { Backend backend = new Backend(count, "10.120.22.32" + count, 6000 + count); backend.updateOnce(7000 + count, 9000 + count, beRpcPort); @@ -125,6 +132,8 @@ public void testSerialization() throws Exception { for (int count = 100; count < 200; count++) { Backend backend = new Backend(count, "10.120.22.32" + count, 6000 + count); backend.updateOnce(7000 + count, 9000 + count, beRpcPort); + backend.setDisks(diskRefs); + backend.setLastStreamLoadTime(count); list1.add(backend); } for (Backend backend : list1) { @@ -135,21 +144,29 @@ public void testSerialization() throws Exception { // 2. Read objects from file DataInputStream dis = new DataInputStream(new FileInputStream(file)); - for (int count = 0; count < 100; ++count) { - Backend backend = new Backend(); - backend.readFields(dis); - list2.add(backend); - Assert.assertEquals(count, backend.getId()); - Assert.assertEquals("10.120.22.32" + count, backend.getHost()); - } - - for (int count = 100; count < 200; ++count) { + for (int count = 0; count < 200; ++count) { Backend backend = Backend.read(dis); list2.add(backend); Assert.assertEquals(count, backend.getId()); Assert.assertEquals("10.120.22.32" + count, backend.getHost()); } - + + // check isAlive + Backend backend100 = list2.get(100); + Assert.assertTrue(backend100.isAlive()); + // check disksRef + ImmutableMap backend100DiskRef = backend100.getDisks(); + Assert.assertEquals(2, backend100DiskRef.size()); + Assert.assertTrue(backend100DiskRef.containsKey("disk1")); + Assert.assertTrue(backend100DiskRef.containsKey("disk2")); + DiskInfo backend100DiskInfo1 = backend100DiskRef.get("disk1"); + Assert.assertEquals("/disk1", backend100DiskInfo1.getRootPath()); + DiskInfo backend100DiskInfo2 = backend100DiskRef.get("disk2"); + Assert.assertEquals("/disk2", backend100DiskInfo2.getRootPath()); + // check backend status + Backend.BackendStatus backend100BackendStatus = backend100.getBackendStatus(); + Assert.assertEquals(100, backend100BackendStatus.lastSuccessReportTabletsTime); + for (int count = 0; count < 200; count++) { Assert.assertTrue(list1.get(count).equals(list2.get(count))); } From 4468ea7206761fd7dafc575627a02843df2e7fa3 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Fri, 30 Apr 2021 15:47:40 +0800 Subject: [PATCH 5/8] fix --- .../apache/doris/persist/gson/GsonUtils.java | 62 +++---------------- .../java/org/apache/doris/system/Backend.java | 3 +- .../org/apache/doris/catalog/BackendTest.java | 16 ++--- 3 files changed, 20 insertions(+), 61 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 001955f95f7f98..7176ad674313b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -68,6 +68,7 @@ import com.google.gson.reflect.TypeToken; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; +import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; /* * Some utilities about Gson. @@ -129,7 +130,7 @@ public class GsonUtils { .registerTypeAdapterFactory(resourceTypeAdapterFactory) .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory) .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory) - .registerTypeAdapterFactory(new ImmutableMapTypeAdapterFactory()) + .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()) .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()); // this instance is thread-safe. @@ -353,58 +354,15 @@ public JsonElement serialize(AtomicBoolean atomicBoolean, Type type, } } - private static class ImmutableMapAdapter implements JsonSerializer>, - JsonDeserializer> { - - @Override - public ImmutableMap deserialize(JsonElement jsonElement, Type type, - JsonDeserializationContext jsonDeserializationContext) - throws JsonParseException { - ImmutableMap.Builder resultBuilder = ImmutableMap.builder(); - JsonObject jsonObject = jsonElement.getAsJsonObject(); - JsonElement mapElement = jsonObject.get("map"); - Map asMap = jsonDeserializationContext.deserialize(mapElement, HashMap.class); - for (Map.Entry entry : asMap.entrySet()) { - resultBuilder.put(entry.getKey(), entry.getValue()); - } - return resultBuilder.build(); - } - - @Override - public JsonElement serialize(ImmutableMap kvImmutableMap, Type typeOfSrc, - JsonSerializationContext jsonSerializationContext) { - JsonObject jsonObject = new JsonObject(); - Map asMap = Maps.newHashMap(); - for (Map.Entry entry: kvImmutableMap.entrySet()) { - asMap.put(entry.getKey(), entry.getValue()); - } - - JsonElement jsonElement = jsonSerializationContext.serialize(asMap, HashMap.class); - jsonObject.add("map", jsonElement); - return jsonObject; - } - } - - public static class ImmutableMapTypeAdapterFactory implements TypeAdapterFactory { - + public final static class ImmutableMapDeserializer implements JsonDeserializer> { @Override - public TypeAdapter create(Gson gson, TypeToken type) { - if (!ImmutableMap.class.isAssignableFrom(type.getRawType())) { - return null; - } - final TypeAdapter delegate = gson.getDelegateAdapter(this, type); - return new TypeAdapter() { - @Override - public void write(JsonWriter out, T value) throws IOException { - delegate.write(out, value); - } - - @Override - @SuppressWarnings("unchecked") - public T read(JsonReader in) throws IOException { - return (T) ImmutableMap.copyOf((Map) delegate.read(in)); - } - }; + public ImmutableMap deserialize(final JsonElement json, final Type type, + final JsonDeserializationContext context) throws JsonParseException + { + final Type type2 = + ParameterizedTypeImpl.make(Map.class, ((ParameterizedType) type).getActualTypeArguments(), null); + final Map map = context.deserialize(json, type2); + return ImmutableMap.copyOf(map); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 963843d888e566..83fbe621249cd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -504,6 +504,7 @@ public static Backend read(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_99) { Backend backend = new Backend(); backend.readFields(in); + LOG.info("read backend by old version"); return backend; } String json = Text.readString(in); @@ -695,7 +696,7 @@ public class BackendStatus { public String lastSuccessReportTabletsTime = "N/A"; @SerializedName("lastStreamLoadTime") // the last time when the stream load status was reported by backend - private long lastStreamLoadTime = -1; + public long lastStreamLoadTime = -1; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java index 285136c3c534c7..34c4fb4403af72 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java @@ -24,10 +24,6 @@ import com.google.common.collect.ImmutableMap; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -38,6 +34,10 @@ import java.util.List; import java.util.Map; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + public class BackendTest { private Backend backend; private long backendId = 9999; @@ -165,7 +165,7 @@ public void testSerialization() throws Exception { Assert.assertEquals("/disk2", backend100DiskInfo2.getRootPath()); // check backend status Backend.BackendStatus backend100BackendStatus = backend100.getBackendStatus(); - Assert.assertEquals(100, backend100BackendStatus.lastSuccessReportTabletsTime); + Assert.assertEquals(100, backend100BackendStatus.lastStreamLoadTime); for (int count = 0; count < 200; count++) { Assert.assertTrue(list1.get(count).equals(list2.get(count))); @@ -191,12 +191,12 @@ public void testSerialization() throws Exception { back2 = new Backend(1, "a", 2); back2.updateOnce(1, 1, 1); Assert.assertFalse(back1.equals(back2)); - + Assert.assertEquals("Backend [id=1, host=a, heartbeatPort=1, alive=true]", back1.toString()); - + // 3. delete files dis.close(); file.delete(); } - + } From 0b8604409e4ed57f2111432d0f3de8fadbfc5891 Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Fri, 30 Apr 2021 15:58:03 +0800 Subject: [PATCH 6/8] remove --- fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java | 1 - .../src/main/java/org/apache/doris/persist/gson/GsonUtils.java | 1 - fe/fe-core/src/main/java/org/apache/doris/system/Backend.java | 1 - 3 files changed, 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java index 284f332912cf5d..ee087909bb42d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -22,7 +22,6 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.system.Backend; import org.apache.doris.thrift.TStorageMedium; import org.apache.logging.log4j.LogManager; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 7176ad674313b8..e5b86f514a20b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -37,7 +37,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Table; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 83fbe621249cd1..d8fd918af2c296 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -24,7 +24,6 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.load.StreamLoadRecordMgr; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.system.HeartbeatResponse.HbStatus; import org.apache.doris.thrift.TDisk; From 24c752a74da2a2634180af89d30bd7fbd8f3ca3c Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Fri, 30 Apr 2021 17:30:41 +0800 Subject: [PATCH 7/8] fix --- be/src/http/action/stream_load.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 6eb64f2d876d2f..eb6691469b7a6d 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -162,8 +162,8 @@ void StreamLoadAction::handle(HttpRequest* req) { str = str + '\n'; HttpChannel::send_reply(req, str); #ifndef BE_TEST - str = ctx->prepare_stream_load_record(str); if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); _sava_stream_load_record(ctx, str); } #endif From 3cd0cecdcce6f233db79766610f357c07fd0b9ba Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Fri, 30 Apr 2021 17:55:25 +0800 Subject: [PATCH 8/8] Add test --- .../java/org/apache/doris/system/Backend.java | 1 - .../apache/doris/catalog/DiskInfoTest.java | 85 +++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/catalog/DiskInfoTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index d8fd918af2c296..8e68359b306b4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -503,7 +503,6 @@ public static Backend read(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_99) { Backend backend = new Backend(); backend.readFields(in); - LOG.info("read backend by old version"); return backend; } String json = Text.readString(in); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DiskInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DiskInfoTest.java new file mode 100644 index 00000000000000..666bedc2feec1c --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DiskInfoTest.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.analysis.AccessTestUtil; +import org.apache.doris.common.FeConstants; +import org.apache.doris.thrift.TStorageMedium; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DiskInfoTest { + + private Catalog catalog; + + private FakeCatalog fakeCatalog; + private FakeEditLog fakeEditLog; + + @Before + public void setUp() { + catalog = AccessTestUtil.fetchAdminCatalog(); + + fakeCatalog = new FakeCatalog(); + fakeEditLog = new FakeEditLog(); + + FakeCatalog.setCatalog(catalog); + FakeCatalog.setMetaVersion(FeConstants.meta_version); + FakeCatalog.setSystemInfo(AccessTestUtil.fetchSystemInfoService()); + } + + @Test + public void testSerialization() throws IOException { + // write disk info to file + File file = new File("./diskInfoTest"); + file.createNewFile(); + DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); + + DiskInfo diskInfo1 = new DiskInfo("/disk1"); + // 1 GB + long totalCapacityB = 1024 * 1024 * 1024L; + diskInfo1.setTotalCapacityB(totalCapacityB); + // 1 MB + long dataUsedCapacityB = 1024 * 1024L; + diskInfo1.setDataUsedCapacityB(dataUsedCapacityB); + // without serialize + diskInfo1.setStorageMedium(TStorageMedium.SSD); + + diskInfo1.write(dos); + dos.flush(); + dos.close(); + + // read disk info from file + DataInputStream dis = new DataInputStream(new FileInputStream(file)); + DiskInfo result = DiskInfo.read(dis); + + // check + Assert.assertEquals("/disk1", result.getRootPath()); + Assert.assertEquals(totalCapacityB, result.getTotalCapacityB()); + Assert.assertEquals(dataUsedCapacityB, result.getDataUsedCapacityB()); + Assert.assertTrue(result.getStorageMedium() == null); + } +}