Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ under the License.

# SHOW CREATE ROUTINE LOAD
## description
The statement is used to show the routine load job creation statement of user-defined
The statement is used to show the routine load job creation statement of user-defined.

The kafka partition and offset in the result show the currently consumed partition and the corresponding offset to be consumed.

grammar:
SHOW [ALL] CREATE ROUTINE LOAD for load_name;

Expand All @@ -39,4 +42,4 @@ under the License.
SHOW CREATE ROUTINE LOAD for test_load

## keyword
SHOW,CREATE,ROUTINE,LOAD
SHOW,CREATE,ROUTINE,LOAD
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ Syntax:

1. `kafka_partitions`
2. `kafka_offsets`
3. Custom property, such as `property.group.id`
3. `kafka_broker_list`
4. `kafka_topic`
5. Custom property, such as `property.group.id`

Notice:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ under the License.

# SHOW CREATE ROUTINE LOAD
## description
该语句用于展示例行导入作业的创建语句
该语句用于展示例行导入作业的创建语句。
结果中的 kafka partition 和 offset 展示的当前消费的 partition,以及对应的待消费的 offset。

语法:
SHOW [ALL] CREATE ROUTINE LOAD for load_name;

Expand All @@ -39,4 +41,4 @@ under the License.
SHOW CREATE ROUTINE LOAD for test_load

## keyword
SHOW,CREATE,ROUTINE,LOAD
SHOW,CREATE,ROUTINE,LOAD
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ under the License.

1. `kafka_partitions`
2. `kafka_offsets`
3. 自定义 property,如 `property.group.id`
3. `kafka_broker_list`
4. `kafka_topic`
5. 自定义 property,如 `property.group.id`

注:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class RoutineLoadDataSourceProperties {
.build();

private static final ImmutableSet<String> CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public Separator(String separator) {
this.separator = null;
}

public String getOriSeparator() {
return oriSeparator;
}

public String getSeparator() {
return separator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public class ShowCreateRoutineLoadStmt extends ShowStmt {

private static final ShowResultSetMetaData META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("Routine Load Id", ScalarType.createVarchar(20)))
.addColumn(new Column("Routine Load Name", ScalarType.createVarchar(20)))
.addColumn(new Column("Create Routine Load", ScalarType.createVarchar(30)))
.addColumn(new Column("JobId", ScalarType.createVarchar(128)))
.addColumn(new Column("JobName", ScalarType.createVarchar(128)))
.addColumn(new Column("CreateStmt", ScalarType.createVarchar(65535)))
.build();

private final LabelName labelName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TKafkaRLTaskProgress;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -128,6 +129,26 @@ public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws
}
}

public List<Pair<Integer, String>> getPartitionOffsetPairs(boolean alreadyConsumed) {
List<Pair<Integer, String>> pairs = Lists.newArrayList();
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
if (entry.getValue() == 0) {
pairs.add(Pair.create(entry.getKey(), OFFSET_ZERO));
} else if (entry.getValue() == -1) {
pairs.add(Pair.create(entry.getKey(), OFFSET_END));
} else if (entry.getValue() == -2) {
pairs.add(Pair.create(entry.getKey(), OFFSET_BEGINNING));
} else {
long offset = entry.getValue();
if (alreadyConsumed) {
offset -= 1;
}
pairs.add(Pair.create(entry.getKey(), "" + offset));
}
}
return pairs;
}

@Override
public String toString() {
Map<Integer, String> showPartitionIdToOffset = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,16 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.Strings;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.TimeZone;
import java.util.UUID;

Expand Down Expand Up @@ -554,7 +555,6 @@ public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException {
throw new DdlException("Only supports modification of PAUSED jobs");
}


modifyPropertiesInternal(jobProperties, dataSourceProperties);

AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
Expand Down Expand Up @@ -593,15 +593,23 @@ private void modifyPropertiesInternal(Map<String, String> jobProperties,
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}

if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}

if (!jobProperties.isEmpty()) {
Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
modifyCommonJobProperties(copiedJobProperties);
this.jobProperties.putAll(copiedJobProperties);
}

if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
// modify broker list and topic
if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaBrokerList())) {
this.brokerList = dataSourceProperties.getKafkaBrokerList();
}
if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaTopic())) {
this.topic = dataSourceProperties.getKafkaTopic();
}

LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
Expand Down Expand Up @@ -1325,10 +1326,10 @@ public String getShowCreateInfo() {
// 4.load_properties
// 4.1.column_separator
if (columnSeparator != null) {
sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getSeparator()).append("\",\n");
sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getOriSeparator()).append("\",\n");
}
// 4.2.columns_mapping
if (columnDescs != null) {
if (columnDescs != null && !columnDescs.descs.isEmpty()) {
sb.append("COLUMNS(").append(Joiner.on(",").join(columnDescs.descs)).append("),\n");
}
// 4.3.where_predicates
Expand All @@ -1352,36 +1353,51 @@ public String getShowCreateInfo() {
sb.append("PRECEDING FILTER ").append(precedingFilter.toSql()).append(",\n");
}
// remove the last ,
if (",".equals(sb.charAt(sb.length() - 2))) {
if (sb.charAt(sb.length() - 2) == ',') {
sb.replace(sb.length() - 2, sb.length() - 1, "");
}
// 5.job_properties
// 5.job_properties. See PROPERTIES_SET of CreateRoutineLoadStmt
sb.append("PROPERTIES\n(\n");
appendProperties(sb, CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, desireTaskConcurrentNum, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false);
appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
appendProperties(sb, PROPS_FORMAT, getFormat(), false);
appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false);
appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), false);
appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false);
appendProperties(sb, PROPS_FUZZY_PARSE, isFuzzyParse(), false);
appendProperties(sb, PROPS_JSONROOT, getJsonRoot(), true);
appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
appendProperties(sb, LoadStmt.EXEC_MEM_LIMIT, getMemLimit(), true);
sb.append(")\n");
// 6. data_source
sb.append("FROM ").append(dataSourceType).append("\n");
// 7. data_source_properties
sb.append("(\n");
getDataSourceProperties().forEach((k, v) -> appendProperties(sb, k, v, false));
getCustomProperties().forEach((k, v) -> appendProperties(sb, k, v, false));
// remove the last ,
if (progress instanceof KafkaProgress) {
// append partitions and offsets.
// the offsets is the next offset to be consumed.
List<Pair<Integer, String>> pairs = ((KafkaProgress) progress).getPartitionOffsetPairs(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shown here is the offset specified when creating the task?
Or the offset that has already been consumed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offset to be consumed. I will add comment and document

appendProperties(sb, CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY,
Joiner.on(", ").join(pairs.stream().map(p -> p.first).toArray()), false);
appendProperties(sb, CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY,
Joiner.on(", ").join(pairs.stream().map(p -> p.second).toArray()), false);
}
// remove the last ","
sb.replace(sb.length() - 2, sb.length() - 1, "");
sb.append(");");
return sb.toString();
}

private static void appendProperties(StringBuilder sb, String key, Object value, boolean end) {
if (value == null || Strings.isNullOrEmpty(value.toString())) {
return;
}
sb.append("\"").append(key).append("\"").append(" = ").append("\"").append(value).append("\"");
if (!end) {
sb.append(",\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public RoutineLoadProgress(LoadDataSourceType loadDataSourceType) {
abstract void update(RLTaskTxnCommitAttachment attachment);

abstract String toJsonString();

public static RoutineLoadProgress read(DataInput in) throws IOException {
RoutineLoadProgress progress = null;
LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public void testUnsupportedProperties() {
}
}

// alter topic is now supported
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
Expand All @@ -138,9 +139,8 @@ public void testUnsupportedProperties() {

try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("kafka_topic is invalid kafka property"));
Assert.fail();
} catch (UserException e) {
e.printStackTrace();
Assert.fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void testAlterNormal() throws UserException {

@Test
public void testAlterAbnormal() {
// can not set KAFKA_BROKER_LIST_PROPERTY
// now support set KAFKA_BROKER_LIST_PROPERTY
Map<String, String> properties = Maps.newHashMap();
properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080");
properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1");
Expand All @@ -301,7 +301,7 @@ public void testAlterAbnormal() {
dsProperties.analyze();
Assert.fail();
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("kafka_broker_list is invalid kafka property"));
Assert.assertTrue(e.getMessage().contains("kafka_default_offsets can only be set to OFFSET_BEGINNING, OFFSET_END or date time"));
}

// can not set datetime formatted offset and integer offset together
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;

import org.apache.kafka.common.PartitionInfo;
import org.junit.Assert;
import org.junit.Test;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.kafka.common.PartitionInfo;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -328,22 +328,24 @@ public void testGetShowCreateInfo() throws UserException {
"PROPERTIES\n" +
"(\n" +
"\"desired_concurrent_number\" = \"0\",\n" +
"\"max_error_number\" = \"10\",\n" +
"\"max_batch_interval\" = \"10\",\n" +
"\"max_batch_rows\" = \"10\",\n" +
"\"max_batch_size\" = \"104857600\",\n" +
"\"max_error_number\" = \"10\",\n" +
"\"strict_mode\" = \"false\",\n" +
"\"timezone\" = \"Asia/Shanghai\",\n" +
"\"format\" = \"csv\",\n" +
"\"jsonpaths\" = \"\",\n" +
"\"strip_outer_array\" = \"false\",\n" +
"\"json_root\" = \"\"\n" +
"\"num_as_string\" = \"false\",\n" +
"\"fuzzy_parse\" = \"false\",\n" +
"\"strict_mode\" = \"false\",\n" +
"\"timezone\" = \"Asia/Shanghai\",\n" +
"\"exec_mem_limit\" = \"2147483648\"\n" +
")\n" +
"FROM KAFKA\n" +
"(\n" +
"\"kafka_broker_list\" = \"localhost:9092\",\n" +
"\"kafka_topic\" = \"test_topic\"\n" +
");";
System.out.println(showCreateInfo);
Assert.assertEquals(expect, showCreateInfo);
}

Expand Down