Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
// You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
CONF_mBool(tablet_writer_ignore_eovercrowded, "false");
// Whether to enable stream load record function, the default is false.
// False: disable stream load record
CONF_mBool(enable_stream_load_record, "false");
// batch size of stream load record reported to FE
CONF_mInt32(stream_load_record_batch_size, "50");
// expire time of stream load record in rocksdb.
Expand Down
6 changes: 4 additions & 2 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,10 @@ void StreamLoadAction::handle(HttpRequest* req) {
str = str + '\n';
HttpChannel::send_reply(req, str);
#ifndef BE_TEST
str = ctx->prepare_stream_load_record(str);
_sava_stream_load_record(ctx, str);
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_sava_stream_load_record(ctx, str);
}
#endif
// update statstics
streaming_load_requests_total->increment(1);
Expand Down
26 changes: 18 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TStorageMedium;

import org.apache.logging.log4j.LogManager;
Expand All @@ -30,6 +31,8 @@
import java.io.DataOutput;
import java.io.IOException;

import com.google.gson.annotations.SerializedName;

public class DiskInfo implements Writable {
private static final Logger LOG = LogManager.getLogger(DiskInfo.class);

Expand All @@ -40,10 +43,15 @@ public enum DiskState {

private static final long DEFAULT_CAPACITY_B = 1024 * 1024 * 1024 * 1024L; // 1T

@SerializedName("rootPath")
private String rootPath;
@SerializedName("totalCapacityB")
private long totalCapacityB;
@SerializedName("dataUsedCapacityB")
private long dataUsedCapacityB;
@SerializedName("diskAvailableCapacityB")
private long diskAvailableCapacityB;
@SerializedName("state")
private DiskState state;

// path hash and storage medium are reported from Backend and no need to persist
Expand Down Expand Up @@ -155,11 +163,8 @@ public String toString() {

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, rootPath);
out.writeLong(totalCapacityB);
out.writeLong(dataUsedCapacityB);
out.writeLong(diskAvailableCapacityB);
Text.writeString(out, state.name());
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}

public void readFields(DataInput in) throws IOException {
Expand All @@ -177,8 +182,13 @@ public void readFields(DataInput in) throws IOException {
}

public static DiskInfo read(DataInput in) throws IOException {
DiskInfo diskInfo = new DiskInfo();
diskInfo.readFields(in);
return diskInfo;
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_99) {
DiskInfo diskInfo = new DiskInfo();
diskInfo.readFields(in);
return diskInfo;
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, DiskInfo.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ public final class FeMetaVersion {
public static final int VERSION_97 = 97;
// add list partition
public static final int VERSION_98 = 98;
// add audit steam load and change the serialization backend method to json
public static final int VERSION_99 = 99;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_98;
public static final int VERSION_CURRENT = VERSION_99;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.doris.load;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.doris.analysis.ShowStreamLoadStmt.StreamLoadState;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.plugin.StreamLoadAuditEvent;
Expand All @@ -38,11 +38,14 @@
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStreamLoadRecord;
import org.apache.doris.thrift.TStreamLoadRecordResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.collect.Maps;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -53,12 +56,14 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import com.google.gson.annotations.SerializedName;

public class StreamLoadRecordMgr extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);

Expand Down Expand Up @@ -231,12 +236,14 @@ protected void runAfterCatalogReady() {
TStreamLoadRecord streamLoadItem= entry.getValue();
String startTime = TimeUtils.longToTimeString(streamLoadItem.getStartTime(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));
String finishTime = TimeUtils.longToTimeString(streamLoadItem.getFinishTime(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));
LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," +
" status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," +
" unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.",
backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), startTime, finishTime);
if (LOG.isDebugEnabled()) {
LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," +
" status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," +
" unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.",
backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), startTime, finishTime);
}

AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH)
.setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl())
Expand Down Expand Up @@ -313,13 +320,14 @@ public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRec
for (Backend backend : backends.values()) {
if (beIdToLastStreamLoad.containsKey(backend.getId())) {
long lastStreamLoadTime = beIdToLastStreamLoad.get(backend.getId());
LOG.info("Replay stream load bdbje. backend: {}, last stream load version: {}", backend.getHost(), lastStreamLoadTime);
LOG.info("Replay stream load bdbje. backend: {}, last stream load time: {}", backend.getHost(), lastStreamLoadTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if disable stream load the value for lastStreamLoadTime will be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, even if the collection process of be is disabled.
The stream load record of fe will still obtain stream load information regularly.
It’s just that every time you get it is empty.

In fact, FE can also add another variable to stop obtaining stream load record.
But there are too many variables.
To avoid using complexity.
I did not add this switch to fe.

backend.setLastStreamLoadTime(lastStreamLoadTime);
}
}
}

public static class FetchStreamLoadRecord implements Writable {
@SerializedName("beIdToLastStreamLoad")
private Map<Long, Long> beIdToLastStreamLoad;

public FetchStreamLoadRecord(Map<Long, Long> beIdToLastStreamLoad) {
Expand All @@ -336,34 +344,13 @@ public Map<Long, Long> getBeIdToLastStreamLoad() {

@Override
public void write(DataOutput out) throws IOException {
for (Map.Entry<Long, Long> entry : beIdToLastStreamLoad.entrySet()) {
out.writeBoolean(true);
out.writeLong(entry.getKey());
out.writeBoolean(true);
out.writeLong(entry.getValue());
LOG.debug("Write stream load bdbje. key: {}, value: {} ", entry.getKey(), entry.getValue());
}
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}

public static FetchStreamLoadRecord read(DataInput in) throws IOException {
Map<Long, Long> idToLastStreamLoad = Maps.newHashMap();
int beNum = Catalog.getCurrentSystemInfo().getIdToBackend().size();
for (int i = 0; i < beNum; i++) {
long beId = -1;
long lastStreamLoad = -1;
if (in.readBoolean()) {
beId = in.readLong();
}
if (in.readBoolean()) {
lastStreamLoad = in.readLong();
}
if (beId != -1 && lastStreamLoad != -1) {
idToLastStreamLoad.put(beId, lastStreamLoad);
}
LOG.debug("Read stream load bdbje. key: {}, value: {} ", beId, lastStreamLoad);
}
FetchStreamLoadRecord fetchStreamLoadRecord = new FetchStreamLoadRecord(idToLastStreamLoad);
return fetchStreamLoadRecord;
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, FetchStreamLoadRecord.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.doris.alter.RollupJobV2;
import org.apache.doris.alter.SchemaChangeJobV2;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.OdbcCatalogResource;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.OdbcCatalogResource;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.ScalarType;
Expand All @@ -34,10 +34,21 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Table;

import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
Expand All @@ -56,14 +67,7 @@
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;

import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;

/*
* Some utilities about Gson.
Expand Down Expand Up @@ -124,7 +128,9 @@ public class GsonUtils {
.registerTypeAdapterFactory(distributionInfoTypeAdapterFactory)
.registerTypeAdapterFactory(resourceTypeAdapterFactory)
.registerTypeAdapterFactory(alterJobV2TypeAdapterFactory)
.registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory);
.registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory)
.registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer())
.registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter());

// this instance is thread-safe.
public static final Gson GSON = GSON_BUILDER.create();
Expand Down Expand Up @@ -326,6 +332,39 @@ public Multimap<K, V> deserialize(JsonElement json, Type typeOfT, JsonDeserializ
}
}

private static class AtomicBooleanAdapter
implements JsonSerializer<AtomicBoolean>, JsonDeserializer<AtomicBoolean> {

@Override
public AtomicBoolean deserialize(JsonElement jsonElement, Type type,
JsonDeserializationContext jsonDeserializationContext)
throws JsonParseException {
JsonObject jsonObject = jsonElement.getAsJsonObject();
boolean value = jsonObject.get("boolean").getAsBoolean();
return new AtomicBoolean(value);
}

@Override
public JsonElement serialize(AtomicBoolean atomicBoolean, Type type,
JsonSerializationContext jsonSerializationContext) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("boolean", atomicBoolean.get());
return jsonObject;
}
}

public final static class ImmutableMapDeserializer implements JsonDeserializer<ImmutableMap<?,?>> {
@Override
public ImmutableMap<?,?> deserialize(final JsonElement json, final Type type,
final JsonDeserializationContext context) throws JsonParseException
{
final Type type2 =
ParameterizedTypeImpl.make(Map.class, ((ParameterizedType) type).getActualTypeArguments(), null);
final Map<?,?> map = context.deserialize(json, type2);
return ImmutableMap.copyOf(map);
}
}

public static class PostProcessTypeAdapterFactory implements TypeAdapterFactory {

public PostProcessTypeAdapterFactory() {
Expand Down
Loading