diff --git a/be/src/common/config.h b/be/src/common/config.h index 023c48edb0e573..3c0a408413a6fd 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -355,6 +355,9 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200"); CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. CONF_mBool(tablet_writer_ignore_eovercrowded, "false"); +// Whether to enable stream load record function, the default is false. +// False: disable stream load record +CONF_mBool(enable_stream_load_record, "false"); // batch size of stream load record reported to FE CONF_mInt32(stream_load_record_batch_size, "50"); // expire time of stream load record in rocksdb. diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index ca569bfc94b67d..eb6691469b7a6d 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -162,8 +162,10 @@ void StreamLoadAction::handle(HttpRequest* req) { str = str + '\n'; HttpChannel::send_reply(req, str); #ifndef BE_TEST - str = ctx->prepare_stream_load_record(str); - _sava_stream_load_record(ctx, str); + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _sava_stream_load_record(ctx, str); + } #endif // update statstics streaming_load_requests_total->increment(1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java index 4a7f458551800d..ee087909bb42d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -21,6 +21,7 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TStorageMedium; import org.apache.logging.log4j.LogManager; @@ -30,6 +31,8 @@ import java.io.DataOutput; import java.io.IOException; +import com.google.gson.annotations.SerializedName; + public class DiskInfo implements Writable { private static final Logger LOG = LogManager.getLogger(DiskInfo.class); @@ -40,10 +43,15 @@ public enum DiskState { private static final long DEFAULT_CAPACITY_B = 1024 * 1024 * 1024 * 1024L; // 1T + @SerializedName("rootPath") private String rootPath; + @SerializedName("totalCapacityB") private long totalCapacityB; + @SerializedName("dataUsedCapacityB") private long dataUsedCapacityB; + @SerializedName("diskAvailableCapacityB") private long diskAvailableCapacityB; + @SerializedName("state") private DiskState state; // path hash and storage medium are reported from Backend and no need to persist @@ -155,11 +163,8 @@ public String toString() { @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, rootPath); - out.writeLong(totalCapacityB); - out.writeLong(dataUsedCapacityB); - out.writeLong(diskAvailableCapacityB); - Text.writeString(out, state.name()); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } public void readFields(DataInput in) throws IOException { @@ -177,8 +182,13 @@ public void readFields(DataInput in) throws IOException { } public static DiskInfo read(DataInput in) throws IOException { - DiskInfo diskInfo = new DiskInfo(); - diskInfo.readFields(in); - return diskInfo; + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_99) { + DiskInfo diskInfo = new DiskInfo(); + diskInfo.readFields(in); + return diskInfo; + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, DiskInfo.class); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 5f4d443fe41145..b26f372cfb8a59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -208,6 +208,8 @@ public final class FeMetaVersion { public static final int VERSION_97 = 97; // add list partition public static final int VERSION_98 = 98; + // add audit steam load and change the serialization backend method to json + public static final int VERSION_99 = 99; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_98; + public static final int VERSION_CURRENT = VERSION_99; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java index 9623a0e1dad331..fb713d68b8b9c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -17,8 +17,6 @@ package org.apache.doris.load; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; import org.apache.doris.analysis.ShowStreamLoadStmt.StreamLoadState; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -26,9 +24,11 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.plugin.AuditEvent; import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.plugin.StreamLoadAuditEvent; @@ -38,11 +38,14 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStreamLoadRecord; import org.apache.doris.thrift.TStreamLoadRecordResult; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import com.google.common.collect.Maps; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; @@ -53,12 +56,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.PriorityQueue; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import com.google.gson.annotations.SerializedName; + public class StreamLoadRecordMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class); @@ -231,12 +236,14 @@ protected void runAfterCatalogReady() { TStreamLoadRecord streamLoadItem= entry.getValue(); String startTime = TimeUtils.longToTimeString(streamLoadItem.getStartTime(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); String finishTime = TimeUtils.longToTimeString(streamLoadItem.getFinishTime(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); - LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," + - " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," + - " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.", - backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(), - streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(), - streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), startTime, finishTime); + if (LOG.isDebugEnabled()) { + LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," + + " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," + + " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.", + backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(), + streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(), + streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), startTime, finishTime); + } AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH) .setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl()) @@ -313,13 +320,14 @@ public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRec for (Backend backend : backends.values()) { if (beIdToLastStreamLoad.containsKey(backend.getId())) { long lastStreamLoadTime = beIdToLastStreamLoad.get(backend.getId()); - LOG.info("Replay stream load bdbje. backend: {}, last stream load version: {}", backend.getHost(), lastStreamLoadTime); + LOG.info("Replay stream load bdbje. backend: {}, last stream load time: {}", backend.getHost(), lastStreamLoadTime); backend.setLastStreamLoadTime(lastStreamLoadTime); } } } public static class FetchStreamLoadRecord implements Writable { + @SerializedName("beIdToLastStreamLoad") private Map beIdToLastStreamLoad; public FetchStreamLoadRecord(Map beIdToLastStreamLoad) { @@ -336,34 +344,13 @@ public Map getBeIdToLastStreamLoad() { @Override public void write(DataOutput out) throws IOException { - for (Map.Entry entry : beIdToLastStreamLoad.entrySet()) { - out.writeBoolean(true); - out.writeLong(entry.getKey()); - out.writeBoolean(true); - out.writeLong(entry.getValue()); - LOG.debug("Write stream load bdbje. key: {}, value: {} ", entry.getKey(), entry.getValue()); - } + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } public static FetchStreamLoadRecord read(DataInput in) throws IOException { - Map idToLastStreamLoad = Maps.newHashMap(); - int beNum = Catalog.getCurrentSystemInfo().getIdToBackend().size(); - for (int i = 0; i < beNum; i++) { - long beId = -1; - long lastStreamLoad = -1; - if (in.readBoolean()) { - beId = in.readLong(); - } - if (in.readBoolean()) { - lastStreamLoad = in.readLong(); - } - if (beId != -1 && lastStreamLoad != -1) { - idToLastStreamLoad.put(beId, lastStreamLoad); - } - LOG.debug("Read stream load bdbje. key: {}, value: {} ", beId, lastStreamLoad); - } - FetchStreamLoadRecord fetchStreamLoadRecord = new FetchStreamLoadRecord(idToLastStreamLoad); - return fetchStreamLoadRecord; + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, FetchStreamLoadRecord.class); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 575385b413150c..e5b86f514a20b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -21,8 +21,8 @@ import org.apache.doris.alter.RollupJobV2; import org.apache.doris.alter.SchemaChangeJobV2; import org.apache.doris.catalog.DistributionInfo; -import org.apache.doris.catalog.OdbcCatalogResource; import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.OdbcCatalogResource; import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.ScalarType; @@ -34,10 +34,21 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.LinkedHashMultimap; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Table; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + import com.google.gson.ExclusionStrategy; import com.google.gson.FieldAttributes; import com.google.gson.Gson; @@ -56,14 +67,7 @@ import com.google.gson.reflect.TypeToken; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl; /* * Some utilities about Gson. @@ -124,7 +128,9 @@ public class GsonUtils { .registerTypeAdapterFactory(distributionInfoTypeAdapterFactory) .registerTypeAdapterFactory(resourceTypeAdapterFactory) .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory) - .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory); + .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory) + .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()) + .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()); // this instance is thread-safe. public static final Gson GSON = GSON_BUILDER.create(); @@ -326,6 +332,39 @@ public Multimap deserialize(JsonElement json, Type typeOfT, JsonDeserializ } } + private static class AtomicBooleanAdapter + implements JsonSerializer, JsonDeserializer { + + @Override + public AtomicBoolean deserialize(JsonElement jsonElement, Type type, + JsonDeserializationContext jsonDeserializationContext) + throws JsonParseException { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + boolean value = jsonObject.get("boolean").getAsBoolean(); + return new AtomicBoolean(value); + } + + @Override + public JsonElement serialize(AtomicBoolean atomicBoolean, Type type, + JsonSerializationContext jsonSerializationContext) { + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("boolean", atomicBoolean.get()); + return jsonObject; + } + } + + public final static class ImmutableMapDeserializer implements JsonDeserializer> { + @Override + public ImmutableMap deserialize(final JsonElement json, final Type type, + final JsonDeserializationContext context) throws JsonParseException + { + final Type type2 = + ParameterizedTypeImpl.make(Map.class, ((ParameterizedType) type).getActualTypeArguments(), null); + final Map map = context.deserialize(json, type2); + return ImmutableMap.copyOf(map); + } + } + public static class PostProcessTypeAdapterFactory implements TypeAdapterFactory { public PostProcessTypeAdapterFactory() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index b64e030f740949..8e68359b306b4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -24,14 +24,15 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.system.HeartbeatResponse.HbStatus; import org.apache.doris.thrift.TDisk; +import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.doris.thrift.TStorageMedium; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -43,6 +44,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.gson.annotations.SerializedName; + /** * This class extends the primary identifier of a Backend with ephemeral state, * eg usage information, current administrative state etc. @@ -57,28 +60,43 @@ public enum BackendState { private static final Logger LOG = LogManager.getLogger(Backend.class); + @SerializedName("id") private long id; + @SerializedName("host") private String host; private String version; + @SerializedName("heartbeatPort") private int heartbeatPort; // heartbeat + @SerializedName("bePort") private volatile int bePort; // be + @SerializedName("httpPort") private volatile int httpPort; // web service + @SerializedName("beRpcPort") private volatile int beRpcPort; // be rpc port + @SerializedName("brpcPort") private volatile int brpcPort = -1; + @SerializedName("lastUpdateMs") private volatile long lastUpdateMs; + @SerializedName("lastStartTime") private volatile long lastStartTime; + @SerializedName("isAlive") private AtomicBoolean isAlive; + @SerializedName("isDecommissioned") private AtomicBoolean isDecommissioned; + @SerializedName("decommissionType") private volatile int decommissionType; + @SerializedName("ownerClusterName") private volatile String ownerClusterName; // to index the state in some cluster + @SerializedName("backendState") private volatile int backendState; // private BackendState backendState; // rootPath -> DiskInfo + @SerializedName("disksRef") private volatile ImmutableMap disksRef; private String heartbeatErrMsg = ""; @@ -93,10 +111,9 @@ public enum BackendState { private volatile long tabletMaxCompactionScore = 0; // additional backendStatus information for BE, display in JSON format + @SerializedName("backendStatus") private BackendStatus backendStatus = new BackendStatus(); - private long lastStreamLoadTime = -1; - public Backend() { this.host = ""; this.version = ""; @@ -114,8 +131,6 @@ public Backend() { this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); - - this.lastStreamLoadTime = -1; } public Backend(long id, String host, int heartbeatPort) { @@ -136,8 +151,6 @@ public Backend(long id, String host, int heartbeatPort) { this.ownerClusterName = ""; this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); - - this.lastStreamLoadTime = -1; } public long getId() { @@ -176,10 +189,10 @@ public String getHeartbeatErrMsg() { return heartbeatErrMsg; } - public long getLastStreamLoadTime() { return lastStreamLoadTime; } + public long getLastStreamLoadTime() { return this.backendStatus.lastStreamLoadTime; } public void setLastStreamLoadTime(long lastStreamLoadTime) { - this.lastStreamLoadTime = lastStreamLoadTime; + this.backendStatus.lastStreamLoadTime = lastStreamLoadTime; } // for test only @@ -487,36 +500,19 @@ public void updateDisks(Map backendDisks) { } public static Backend read(DataInput in) throws IOException { - Backend backend = new Backend(); - backend.readFields(in); - return backend; + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_99) { + Backend backend = new Backend(); + backend.readFields(in); + return backend; + } + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, Backend.class); } @Override public void write(DataOutput out) throws IOException { - out.writeLong(id); - Text.writeString(out, host); - out.writeInt(heartbeatPort); - out.writeInt(bePort); - out.writeInt(httpPort); - out.writeInt(beRpcPort); - out.writeBoolean(isAlive.get()); - out.writeBoolean(isDecommissioned.get()); - out.writeLong(lastUpdateMs); - out.writeLong(lastStartTime); - - ImmutableMap disks = disksRef; - out.writeInt(disks.size()); - for (Map.Entry entry : disks.entrySet()) { - Text.writeString(out, entry.getKey()); - entry.getValue().write(out); - } - - Text.writeString(out, ownerClusterName); - out.writeInt(backendState); - out.writeInt(decommissionType); - - out.writeInt(brpcPort); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } public void readFields(DataInput in) throws IOException { @@ -696,6 +692,10 @@ private int getDiskNum() { public class BackendStatus { // this will be output as json, so not using FeConstants.null_string; public String lastSuccessReportTabletsTime = "N/A"; + @SerializedName("lastStreamLoadTime") + // the last time when the stream load status was reported by backend + public long lastStreamLoadTime = -1; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java index 2168f89ea08bca..34c4fb4403af72 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java @@ -22,9 +22,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TDisk; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import com.google.common.collect.ImmutableMap; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -36,6 +34,10 @@ import java.util.List; import java.util.Map; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + public class BackendTest { private Backend backend; private long backendId = 9999; @@ -116,7 +118,12 @@ public void testSerialization() throws Exception { List list1 = new LinkedList(); List list2 = new LinkedList(); - + + DiskInfo diskInfo1 = new DiskInfo("/disk1"); + DiskInfo diskInfo2 = new DiskInfo("/disk2"); + ImmutableMap diskRefs = ImmutableMap.of( + "disk1", diskInfo1, + "disk2", diskInfo2); for (int count = 0; count < 100; ++count) { Backend backend = new Backend(count, "10.120.22.32" + count, 6000 + count); backend.updateOnce(7000 + count, 9000 + count, beRpcPort); @@ -125,6 +132,8 @@ public void testSerialization() throws Exception { for (int count = 100; count < 200; count++) { Backend backend = new Backend(count, "10.120.22.32" + count, 6000 + count); backend.updateOnce(7000 + count, 9000 + count, beRpcPort); + backend.setDisks(diskRefs); + backend.setLastStreamLoadTime(count); list1.add(backend); } for (Backend backend : list1) { @@ -135,21 +144,29 @@ public void testSerialization() throws Exception { // 2. Read objects from file DataInputStream dis = new DataInputStream(new FileInputStream(file)); - for (int count = 0; count < 100; ++count) { - Backend backend = new Backend(); - backend.readFields(dis); - list2.add(backend); - Assert.assertEquals(count, backend.getId()); - Assert.assertEquals("10.120.22.32" + count, backend.getHost()); - } - - for (int count = 100; count < 200; ++count) { + for (int count = 0; count < 200; ++count) { Backend backend = Backend.read(dis); list2.add(backend); Assert.assertEquals(count, backend.getId()); Assert.assertEquals("10.120.22.32" + count, backend.getHost()); } - + + // check isAlive + Backend backend100 = list2.get(100); + Assert.assertTrue(backend100.isAlive()); + // check disksRef + ImmutableMap backend100DiskRef = backend100.getDisks(); + Assert.assertEquals(2, backend100DiskRef.size()); + Assert.assertTrue(backend100DiskRef.containsKey("disk1")); + Assert.assertTrue(backend100DiskRef.containsKey("disk2")); + DiskInfo backend100DiskInfo1 = backend100DiskRef.get("disk1"); + Assert.assertEquals("/disk1", backend100DiskInfo1.getRootPath()); + DiskInfo backend100DiskInfo2 = backend100DiskRef.get("disk2"); + Assert.assertEquals("/disk2", backend100DiskInfo2.getRootPath()); + // check backend status + Backend.BackendStatus backend100BackendStatus = backend100.getBackendStatus(); + Assert.assertEquals(100, backend100BackendStatus.lastStreamLoadTime); + for (int count = 0; count < 200; count++) { Assert.assertTrue(list1.get(count).equals(list2.get(count))); } @@ -174,12 +191,12 @@ public void testSerialization() throws Exception { back2 = new Backend(1, "a", 2); back2.updateOnce(1, 1, 1); Assert.assertFalse(back1.equals(back2)); - + Assert.assertEquals("Backend [id=1, host=a, heartbeatPort=1, alive=true]", back1.toString()); - + // 3. delete files dis.close(); file.delete(); } - + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DiskInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DiskInfoTest.java new file mode 100644 index 00000000000000..666bedc2feec1c --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DiskInfoTest.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.analysis.AccessTestUtil; +import org.apache.doris.common.FeConstants; +import org.apache.doris.thrift.TStorageMedium; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class DiskInfoTest { + + private Catalog catalog; + + private FakeCatalog fakeCatalog; + private FakeEditLog fakeEditLog; + + @Before + public void setUp() { + catalog = AccessTestUtil.fetchAdminCatalog(); + + fakeCatalog = new FakeCatalog(); + fakeEditLog = new FakeEditLog(); + + FakeCatalog.setCatalog(catalog); + FakeCatalog.setMetaVersion(FeConstants.meta_version); + FakeCatalog.setSystemInfo(AccessTestUtil.fetchSystemInfoService()); + } + + @Test + public void testSerialization() throws IOException { + // write disk info to file + File file = new File("./diskInfoTest"); + file.createNewFile(); + DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); + + DiskInfo diskInfo1 = new DiskInfo("/disk1"); + // 1 GB + long totalCapacityB = 1024 * 1024 * 1024L; + diskInfo1.setTotalCapacityB(totalCapacityB); + // 1 MB + long dataUsedCapacityB = 1024 * 1024L; + diskInfo1.setDataUsedCapacityB(dataUsedCapacityB); + // without serialize + diskInfo1.setStorageMedium(TStorageMedium.SSD); + + diskInfo1.write(dos); + dos.flush(); + dos.close(); + + // read disk info from file + DataInputStream dis = new DataInputStream(new FileInputStream(file)); + DiskInfo result = DiskInfo.read(dis); + + // check + Assert.assertEquals("/disk1", result.getRootPath()); + Assert.assertEquals(totalCapacityB, result.getTotalCapacityB()); + Assert.assertEquals(dataUsedCapacityB, result.getDataUsedCapacityB()); + Assert.assertTrue(result.getStorageMedium() == null); + } +}