Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions br/pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func ParseRawURL(rawURL string) (*url.URL, error) {
return u, nil
}

// ParseBackendFromURL constructs a structured backend description from the
// *url.URL.
func ParseBackendFromURL(u *url.URL, options *BackendOptions) (*backuppb.StorageBackend, error) {
return parseBackend(u, "", options)
}

// ParseBackend constructs a structured backend description from the
// storage URL.
func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBackend, error) {
Expand All @@ -45,6 +51,14 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBack
if err != nil {
return nil, errors.Trace(err)
}
return parseBackend(u, rawURL, options)
}

func parseBackend(u *url.URL, rawURL string, options *BackendOptions) (*backuppb.StorageBackend, error) {
if rawURL == "" {
// try to handle hdfs for ParseBackendFromURL caller
rawURL = u.String()
}
switch u.Scheme {
case "":
absPath, err := filepath.Abs(rawURL)
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func Create(ctx context.Context, backend *backuppb.StorageBackend, sendCreds boo

// New creates an ExternalStorage with options.
func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalStorageOptions) (ExternalStorage, error) {
if opts == nil {
opts = &ExternalStorageOptions{}
}
switch backend := backend.Backend.(type) {
case *backuppb.StorageBackend_Local:
if backend.Local == nil {
Expand Down
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
IgnoreLines: v.IgnoreLines,
ColumnAssignments: v.ColumnAssignments,
ColumnsAndUserVars: v.ColumnsAndUserVars,
OnDuplicate: v.OnDuplicate,
Ctx: b.ctx,
}
columnNames := loadDataInfo.initFieldMappings()
Expand All @@ -947,7 +948,7 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
}
loadDataExec := &LoadDataExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
IsLocal: v.IsLocal,
FileLocRef: v.FileLocRef,
OnDuplicate: v.OnDuplicate,
loadDataInfo: loadDataInfo,
}
Expand Down
181 changes: 170 additions & 11 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,27 @@
package executor

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -40,13 +47,15 @@ import (
var (
null = []byte("NULL")
taskQueueSize = 16 // the maximum number of pending tasks to commit in queue
// InTest is a flag that bypass gcs authentication in unit tests.
InTest bool
)

// LoadDataExec represents a load data executor.
type LoadDataExec struct {
baseExecutor

IsLocal bool
FileLocRef ast.FileLocRefTp
OnDuplicate ast.OnDuplicateKeyHandlingType
loadDataInfo *LoadDataInfo
}
Expand All @@ -55,29 +64,73 @@ type LoadDataExec struct {
func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
// TODO: support load data without local field.
if !e.IsLocal {
if e.FileLocRef == ast.FileLocServer {
return errors.New("Load Data: don't support load data without local field")
}
e.loadDataInfo.OnDuplicate = e.OnDuplicate
// TODO: support lines terminated is "".
if len(e.loadDataInfo.LinesInfo.Terminated) == 0 {
return errors.New("Load Data: don't support load data terminated is nil")
}

sctx := e.loadDataInfo.ctx
val := sctx.Value(LoadDataVarKey)
if val != nil {
sctx.SetValue(LoadDataVarKey, nil)
return errors.New("Load Data: previous load data option isn't closed normal")
}
if e.loadDataInfo.Path == "" {
return errors.New("Load Data: infile path is empty")
}
sctx.SetValue(LoadDataVarKey, e.loadDataInfo)
if !e.loadDataInfo.Table.Meta().IsBaseTable() {
return errors.New("can only load data into base tables")
}

switch e.FileLocRef {
case ast.FileLocServer:
panic("FileLocServer should be handled earlier")
case ast.FileLocClient:
// let caller use handleQuerySpecial to read data in this connection
sctx := e.loadDataInfo.ctx
val := sctx.Value(LoadDataVarKey)
if val != nil {
sctx.SetValue(LoadDataVarKey, nil)
return errors.New("Load Data: previous load data option wasn't closed normally")
}
sctx.SetValue(LoadDataVarKey, e.loadDataInfo)
case ast.FileLocRemote:
return e.loadFromRemote(ctx)
}
return nil
}

func (e *LoadDataExec) loadFromRemote(ctx context.Context) error {
u, err := storage.ParseRawURL(e.loadDataInfo.Path)
if err != nil {
return err
}
var filename string
u.Path, filename = filepath.Split(u.Path)
b, err := storage.ParseBackendFromURL(u, nil)
if err != nil {
return err
}
if b.GetLocal() != nil {
return errors.Errorf("Load Data: don't support load data from tidb-server when set REMOTE, path %s", e.loadDataInfo.Path)
}

opt := &storage.ExternalStorageOptions{}
if InTest {
opt.NoCredentials = true
}
s, err := storage.New(ctx, b, opt)
if err != nil {
return err
}
fileReader, err := s.Open(ctx, filename)
if err != nil {
return err
}
defer fileReader.Close()
reader := bufio.NewReader(fileReader)

return e.loadDataInfo.Load(ctx, func() ([]byte, error) {
return reader.ReadBytes('\n')
})
}

// Close implements the Executor Close interface.
func (e *LoadDataExec) Close() error {
if e.runtimeStats != nil && e.loadDataInfo != nil && e.loadDataInfo.stats != nil {
Expand All @@ -103,6 +156,7 @@ type CommitTask struct {
}

// LoadDataInfo saves the information of loading data operation.
// TODO: rename it and remove unnecessary public methods.
type LoadDataInfo struct {
*InsertValues

Expand Down Expand Up @@ -132,6 +186,111 @@ type FieldMapping struct {
UserVar *ast.VariableExpr
}

// Load reads from readerFn and do load data job.
func (e *LoadDataInfo) Load(ctx context.Context, readerFn func() ([]byte, error)) error {
e.InitQueues()
e.SetMaxRowsInBatch(uint64(e.Ctx.GetSessionVars().DMLBatchSize))
e.StartStopWatcher()
// let stop watcher goroutine quit
defer e.ForceQuit()
err := sessiontxn.NewTxn(ctx, e.Ctx)
if err != nil {
return err
}
// processStream process input data, enqueue commit task
wg := new(sync.WaitGroup)
wg.Add(1)
go processStream(ctx, readerFn, e, wg)
err = e.CommitWork(ctx)
wg.Wait()
return err
}

// processStream process input stream from network
func processStream(ctx context.Context, readerFn func() ([]byte, error), loadDataInfo *LoadDataInfo, wg *sync.WaitGroup) {
var err error
var shouldBreak bool
var prevData, curData []byte
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("process routine panicked",
zap.Reflect("r", r),
zap.Stack("stack"))
}
if err != nil || r != nil {
loadDataInfo.ForceQuit()
} else {
loadDataInfo.CloseTaskQueue()
}
wg.Done()
}()
for {
curData, err = readerFn()
if err != nil {
if terror.ErrorNotEqual(err, io.EOF) {
logutil.Logger(ctx).Error("read data for LOAD DATA failed", zap.Error(err))
break
}
err = nil
Comment thread
sleepymole marked this conversation as resolved.
}
if len(curData) == 0 {
loadDataInfo.Drained = true
shouldBreak = true
if len(prevData) == 0 {
break
}
}
select {
case <-loadDataInfo.QuitCh:
err = errors.New("processStream forced to quit")
default:
}
if err != nil {
break
}
// prepare batch and enqueue task
prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo)
if err != nil {
break
}
if shouldBreak {
break
}
}
if err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
return
}
if err = loadDataInfo.EnqOneTask(ctx); err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
return
}
}

func insertDataWithCommit(ctx context.Context, prevData,
curData []byte, loadDataInfo *LoadDataInfo) ([]byte, error) {
var err error
var reachLimit bool
for {
prevData, reachLimit, err = loadDataInfo.InsertData(ctx, prevData, curData)
if err != nil {
return nil, err
}
if !reachLimit {
break
}
// push into commit task queue
err = loadDataInfo.EnqOneTask(ctx)
if err != nil {
return prevData, err
}
curData = prevData
prevData = nil
}
return prevData, nil
}

// reorderColumns reorder the e.insertColumns according to the order of columnNames
// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name.
func (e *LoadDataInfo) reorderColumns(columnNames []string) error {
Expand Down
18 changes: 18 additions & 0 deletions executor/loadremotetest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "loadremotetest_test",
srcs = [
"main_test.go",
"one_csv_test.go",
"util_test.go",
],
deps = [
"//executor",
"//kv",
"//testkit",
"@com_github_fsouza_fake_gcs_server//fakestorage",
"@com_github_stretchr_testify//suite",
"@org_uber_go_goleak//:goleak",
],
)
33 changes: 33 additions & 0 deletions executor/loadremotetest/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package loadremotetest

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
goleak.IgnoreTopFunction("net.(*netFD).connect.func2"),
}
goleak.VerifyTestMain(m, opts...)
}
Loading