Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ under the License.
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
Expand Down Expand Up @@ -470,6 +471,40 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.4</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
Expand Down
87 changes: 85 additions & 2 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ parser code {:

// Total keywords of doris
terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_APPEND, KW_AS, KW_ASC, KW_AUTHORS, KW_ARRAY,
KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BITMAP, KW_BITMAP_UNION, KW_BOOLEAN, KW_BROKER, KW_BACKENDS, KW_BY, KW_BUILTIN,
KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BINLOG, KW_BITMAP, KW_BITMAP_UNION, KW_BOOLEAN, KW_BROKER, KW_BACKENDS, KW_BY, KW_BUILTIN,
KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CHECK, KW_CLUSTER, KW_CLUSTERS,
KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLON, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED,
KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER,
Expand All @@ -247,7 +247,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A
KW_HASH, KW_HAVING, KW_HDFS, KW_HELP,KW_HLL, KW_HLL_UNION, KW_HOUR, KW_HUB,
KW_IDENTIFIED, KW_IF, KW_IN, KW_INDEX, KW_INDEXES, KW_INFILE, KW_INSTALL,
KW_INNER, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERSECT, KW_INTERVAL, KW_INTO, KW_IS, KW_ISNULL, KW_ISOLATION,
KW_JOIN,
KW_JOB, KW_JOIN,
KW_KEY, KW_KEYS, KW_KILL,
KW_LABEL, KW_LARGEINT, KW_LAST, KW_LEFT, KW_LESS, KW_LEVEL, KW_LIKE, KW_LIMIT, KW_LINK, KW_LIST, KW_LOAD,
KW_LOCAL, KW_LOCATION,
Expand Down Expand Up @@ -485,6 +485,15 @@ nonterminal String keyword, ident, ident_or_text, variable_name, text_or_passwor
collation_name_or_default, type_func_name_keyword, type_function_name, opt_file_format, time_unit,
literal_or_ident;

// sync job
nonterminal List<ChannelDescription> channel_desc_list;
nonterminal ChannelDescription channel_desc;
nonterminal BinlogDesc binlog_desc;
nonterminal ResumeSyncJobStmt resume_sync_job_stmt;
nonterminal PauseSyncJobStmt pause_sync_job_stmt;
nonterminal StopSyncJobStmt stop_sync_job_stmt;
nonterminal JobName job_name;

nonterminal String opt_db, procedure_or_function, opt_comment, opt_engine;
nonterminal ColumnDef.DefaultValue opt_default_value;
nonterminal Boolean opt_if_exists, opt_if_not_exists;
Expand Down Expand Up @@ -672,6 +681,12 @@ stmt ::=
{: RESULT = stmt; :}
| resume_routine_load_stmt : stmt
{: RESULT = stmt; :}
| pause_sync_job_stmt : stmt
{: RESULT = stmt; :}
| resume_sync_job_stmt : stmt
{: RESULT = stmt; :}
| stop_sync_job_stmt : stmt
{: RESULT = stmt; :}
| stop_routine_load_stmt : stmt
{: RESULT = stmt; :}
| show_routine_load_stmt : stmt
Expand Down Expand Up @@ -1226,6 +1241,69 @@ create_stmt ::=
{:
RESULT = new CreateEncryptKeyStmt(keyName, keyString);
:}
/* sync job */
| KW_CREATE KW_SYNC ident:db DOT ident_or_text:jobName LPAREN channel_desc_list:channelDescList RPAREN binlog_desc:binlog opt_properties:properties
{:
RESULT = new CreateDataSyncJobStmt(jobName, db, channelDescList, binlog, properties);
:}
;

channel_desc_list ::=
channel_desc:desc
{:
RESULT = Lists.newArrayList(desc);
:}
| channel_desc_list:list COMMA channel_desc:desc
{:
list.add(desc);
RESULT = list;
:}
;

channel_desc ::=
KW_FROM ident:srcDatabase DOT ident:srcTableName KW_INTO ident:desTableName opt_partition_names:partitionNames opt_col_list:colList
{:
RESULT = new ChannelDescription(srcDatabase, srcTableName, desTableName, partitionNames, colList);
:}
;

binlog_desc ::=
KW_FROM KW_BINLOG LPAREN key_value_map:properties RPAREN
{:
RESULT = new BinlogDesc(properties);
:}
;

resume_sync_job_stmt ::=
KW_RESUME KW_SYNC KW_JOB job_name:jobName
{:
RESULT = new ResumeSyncJobStmt(jobName);
:}
;

pause_sync_job_stmt ::=
KW_PAUSE KW_SYNC KW_JOB job_name:jobName
{:
RESULT = new PauseSyncJobStmt(jobName);
:}
;

stop_sync_job_stmt ::=
KW_STOP KW_SYNC KW_JOB job_name:jobName
{:
RESULT = new StopSyncJobStmt(jobName);
:}
;

job_name ::=
ident:jobName
{:
RESULT = new JobName("", jobName);
:}
| ident:db DOT ident:jobName
{:
RESULT = new JobName(db, jobName);
:}
;

opt_aggregate ::=
Expand Down Expand Up @@ -2640,6 +2718,11 @@ show_param ::=
{:
RESULT = new ShowEncryptKeysStmt(dbName, parser.wild);
:}
/* Show Sync Job */
| KW_SYNC KW_JOB opt_db:dbName
{:
RESULT = new ShowSyncJobStmt(dbName);
:}
;

opt_tmp ::=
Expand Down
67 changes: 67 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/BinlogDesc.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.load.sync.DataSyncJobType;

import com.google.common.collect.Maps;

import java.util.Map;

// Binlog descriptor
//
// Example:
// FROM BINLOG
// (
// "type" = "canal",
// "canal.server.ip" = "127.0.0.1",
// "canal.server.port" = "11111",
// "canal.destination" = "example",
// "canal.username" = "canal",
// "canal.password" = "canal"
// )

public class BinlogDesc {
private static final String TYPE = "type";
private Map<String, String> properties;
private DataSyncJobType dataSyncJobType;

public BinlogDesc(Map<String, String> properties) {
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newHashMap();
}
this.dataSyncJobType = DataSyncJobType.UNKNOWN;
}

public Map<String, String> getProperties() {
return properties;
}

public DataSyncJobType getDataSyncJobType() {
return dataSyncJobType;
}

public void analyze() throws AnalysisException {
if (!properties.containsKey(TYPE)) {
throw new AnalysisException("Binlog properties must contain property `type`");
}
dataSyncJobType = DataSyncJobType.fromString(properties.get(TYPE));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Strings;
import com.google.gson.annotations.SerializedName;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

// used to describe channel info in data sync job
// channel_desc:
// FROM mysql_db.src_tbl INTO doris_db.des_tbl
// [PARTITION (p1, p2)]
// [(col1, ...)]
// [KEEP ORDER]
public class ChannelDescription implements Writable {
private static final Logger LOG = LogManager.getLogger(ChannelDescription.class);

@SerializedName(value = "srcDatabase")
private final String srcDatabase;
@SerializedName(value = "srcTableName")
private final String srcTableName;
@SerializedName(value = "targetTable")
private final String targetTable;
@SerializedName(value = "partitionNames")
private final PartitionNames partitionNames;
// column names of source table
@SerializedName(value = "colNames")
private final List<String> colNames;

public ChannelDescription(String srcDatabase, String srcTableName, String targetTable, PartitionNames partitionNames, List<String> colNames) {
this.srcDatabase = srcDatabase;
this.srcTableName = srcTableName;
this.targetTable = targetTable;
this.partitionNames = partitionNames;
this.colNames = colNames;
}

public List<String> getColNames() {
if (colNames == null || colNames.isEmpty()) {
return null;
}
return colNames;
}

public void analyze(String fullDbName) throws AnalysisException {
if (Strings.isNullOrEmpty(srcDatabase)) {
throw new AnalysisException("No source database in channel description.");
}

if (Strings.isNullOrEmpty(srcTableName)) {
throw new AnalysisException("No source table in channel description.");
}

checkAuth(fullDbName);

if (partitionNames != null) {
partitionNames.analyze(null);
}

analyzeColumns();
}

private void checkAuth(String fullDbName) throws AnalysisException {
if (Strings.isNullOrEmpty(targetTable)) {
throw new AnalysisException("No target table is assigned in channel description.");
}

// check target table auth
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), fullDbName, targetTable,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), targetTable);
}
}

private void analyzeColumns() throws AnalysisException {
Set<String> columnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
if ((colNames != null && !colNames.isEmpty())) {
for (String columnName : colNames) {
if (!columnNames.add(columnName)) {
throw new AnalysisException("Duplicate column: " + columnName);
}
}
}
}

public String getTargetTable() {
return targetTable;
}

public String getSrcDatabase() {
return srcDatabase;
}

public String getSrcTableName() {
return srcTableName;
}

public PartitionNames getPartitionNames() {
return partitionNames;
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}

public static ChannelDescription read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ChannelDescription.class);
}
}
Loading