Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
go.uber.org/zap v1.13.0
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
golang.org/x/net v0.0.0-20191011234655-491137f69257 // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/tools v0.0.0-20191213032237-7093a17b0467 // indirect
google.golang.org/api v0.14.0
google.golang.org/grpc v1.25.1
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ const (
flagSendCredentialOption = "send-credentials-to-tikv"
)

var (
sendCredential bool
)

// DefineFlags adds flags to the flag set corresponding to all backend options.
func DefineFlags(flags *pflag.FlagSet) {
flags.BoolP(flagSendCredentialOption, "c", true,
Expand All @@ -21,6 +25,12 @@ func DefineFlags(flags *pflag.FlagSet) {

// GetBackendOptionsFromFlags obtains the backend options from the flag set.
func GetBackendOptionsFromFlags(flags *pflag.FlagSet) (options BackendOptions, err error) {
sendCredential, err = flags.GetBool(flagSendCredentialOption)
if err != nil {
err = errors.Trace(err)
return
}

if options.S3, err = getBackendOptionsFromS3Flags(flags); err != nil {
return
}
Expand Down
49 changes: 40 additions & 9 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"io"
"io/ioutil"
"net/http"

"cloud.google.com/go/storage"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
"github.com/spf13/pflag"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
)

Expand All @@ -28,19 +30,18 @@ type GCSBackendOptions struct {
}

func (options *GCSBackendOptions) apply(gcs *backup.GCS) error {
if options.CredentialsFile == "" {
return errors.New("must provide 'gcs.credentials_file'")
}

gcs.Endpoint = options.Endpoint
gcs.StorageClass = options.StorageClass
gcs.PredefinedAcl = options.PredefinedACL

b, err := ioutil.ReadFile(options.CredentialsFile)
if err != nil {
return err
if options.CredentialsFile != "" {
b, err := ioutil.ReadFile(options.CredentialsFile)
if err != nil {
return err
}
gcs.CredentialsBlob = string(b)
}
gcs.CredentialsBlob = string(b)
return nil
}

Expand All @@ -67,7 +68,7 @@ https://console.cloud.google.com/apis/credentials.`)
}

func getBackendOptionsFromGCSFlags(flags *pflag.FlagSet) (options GCSBackendOptions, err error) {
options.Endpoint, err = flags.GetString(s3EndpointOption)
options.Endpoint, err = flags.GetString(gcsEndpointOption)
if err != nil {
err = errors.Trace(err)
return
Expand Down Expand Up @@ -139,15 +140,45 @@ func (s *gcsStorage) FileExists(ctx context.Context, name string) (bool, error)
}

func newGCSStorage(ctx context.Context, gcs *backup.GCS) (*gcsStorage, error) {
return newGCSStorageWithHTTPClient(ctx, gcs, nil)
}

func newGCSStorageWithHTTPClient(ctx context.Context, gcs *backup.GCS, hclient *http.Client) (*gcsStorage, error) {
var clientOps []option.ClientOption
clientOps = append(clientOps, option.WithCredentialsJSON([]byte(gcs.GetCredentialsBlob())))
if gcs.CredentialsBlob == "" {
Comment thread
overvenus marked this conversation as resolved.
creds, err := google.FindDefaultCredentials(ctx, storage.ScopeReadWrite)
if err != nil {
return nil, errors.New(err.Error() + "Or you should provide '--gcs.credentials_file'.")
}
if sendCredential {
if len(creds.JSON) > 0 {
gcs.CredentialsBlob = string(creds.JSON)
} else {
return nil, errors.New(
"You should provide '--gcs.credentials_file' when '--send-credentials-to-tikv' is true")
}
}
clientOps = append(clientOps, option.WithCredentials(creds))
} else {
clientOps = append(clientOps, option.WithCredentialsJSON([]byte(gcs.GetCredentialsBlob())))
}

if gcs.Endpoint != "" {
clientOps = append(clientOps, option.WithEndpoint(gcs.Endpoint))
}
if hclient != nil {
clientOps = append(clientOps, option.WithHTTPClient(hclient))
}
client, err := storage.NewClient(ctx, clientOps...)
if err != nil {
return nil, err
}

if !sendCredential {
// Clear the credentials if exists so that they will not be sent to TiKV
gcs.CredentialsBlob = ""
}

bucket := client.Bucket(gcs.Bucket)
// check bucket exists
_, err = bucket.Attrs(ctx)
Expand Down
130 changes: 118 additions & 12 deletions pkg/storage/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ package storage
import (
"context"
"io/ioutil"
"os"

"github.com/fsouza/fake-gcs-server/fakestorage"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/backup"
)

type testSuite struct{}

var _ = Suite(&testSuite{})
func (r *testStorageSuite) TestGCS(c *C) {
ctx := context.Background()

func (r *testSuite) TestGCS(c *C) {
opts := fakestorage.Options{
NoListener: true,
}
Expand All @@ -22,15 +21,16 @@ func (r *testSuite) TestGCS(c *C) {
bucketName := "testbucket"
server.CreateBucket(bucketName)

stg := &gcsStorage{
gcs: &backup.GCS{
Prefix: "a/b/",
StorageClass: "NEARLINE",
PredefinedAcl: "private",
},
bucket: server.Client().Bucket(bucketName),
gcs := &backup.GCS{
Bucket: bucketName,
Prefix: "a/b/",
StorageClass: "NEARLINE",
PredefinedAcl: "private",
CredentialsBlob: "Fake Credentials",
}
ctx := context.Background()
stg, err := newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient())
c.Assert(err, IsNil)

err = stg.Write(ctx, "key", []byte("data"))
c.Assert(err, IsNil)

Expand All @@ -53,3 +53,109 @@ func (r *testSuite) TestGCS(c *C) {
c.Assert(err, IsNil)
c.Assert(exist, IsFalse)
}

func (r *testStorageSuite) TestNewGCSStorage(c *C) {
ctx := context.Background()

opts := fakestorage.Options{
NoListener: true,
}
server, err := fakestorage.NewServerWithOptions(opts)
c.Assert(err, IsNil)
bucketName := "testbucket"
server.CreateBucket(bucketName)

{
sendCredential = true
gcs := &backup.GCS{
Bucket: bucketName,
Prefix: "a/b/",
StorageClass: "NEARLINE",
PredefinedAcl: "private",
CredentialsBlob: "FakeCredentials",
}
_, err := newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient())
c.Assert(err, IsNil)
c.Assert(gcs.CredentialsBlob, Equals, "FakeCredentials")
}

{
sendCredential = false
gcs := &backup.GCS{
Bucket: bucketName,
Prefix: "a/b/",
StorageClass: "NEARLINE",
PredefinedAcl: "private",
CredentialsBlob: "FakeCredentials",
}
_, err := newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient())
c.Assert(err, IsNil)
c.Assert(gcs.CredentialsBlob, Equals, "")
}

{
fakeCredentialsFile, err := ioutil.TempFile("", "fakeCredentialsFile")
c.Assert(err, IsNil)
defer func() {
fakeCredentialsFile.Close()
os.Remove(fakeCredentialsFile.Name())
}()
_, err = fakeCredentialsFile.Write([]byte(`{"type": "service_account"}`))
c.Assert(err, IsNil)
err = os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", fakeCredentialsFile.Name())
defer os.Unsetenv("GOOGLE_APPLICATION_CREDENTIALS")
c.Assert(err, IsNil)

sendCredential = true
gcs := &backup.GCS{
Bucket: bucketName,
Prefix: "a/b/",
StorageClass: "NEARLINE",
PredefinedAcl: "private",
CredentialsBlob: "",
}
_, err = newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient())
c.Assert(err, IsNil)
c.Assert(gcs.CredentialsBlob, Equals, `{"type": "service_account"}`)
}

{
fakeCredentialsFile, err := ioutil.TempFile("", "fakeCredentialsFile")
c.Assert(err, IsNil)
defer func() {
fakeCredentialsFile.Close()
os.Remove(fakeCredentialsFile.Name())
}()
_, err = fakeCredentialsFile.Write([]byte(`{"type": "service_account"}`))
c.Assert(err, IsNil)
err = os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", fakeCredentialsFile.Name())
defer os.Unsetenv("GOOGLE_APPLICATION_CREDENTIALS")
c.Assert(err, IsNil)

sendCredential = false
gcs := &backup.GCS{
Bucket: bucketName,
Prefix: "a/b/",
StorageClass: "NEARLINE",
PredefinedAcl: "private",
CredentialsBlob: "",
}
_, err = newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient())
c.Assert(err, IsNil)
c.Assert(gcs.CredentialsBlob, Equals, "")
}

{
sendCredential = true
os.Unsetenv("GOOGLE_APPLICATION_CREDENTIALS")
gcs := &backup.GCS{
Bucket: bucketName,
Prefix: "a/b/",
StorageClass: "NEARLINE",
PredefinedAcl: "private",
CredentialsBlob: "",
}
_, err = newGCSStorageWithHTTPClient(ctx, gcs, server.HTTPClient())
c.Assert(err, NotNil)
}
}
22 changes: 14 additions & 8 deletions pkg/storage/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,9 @@ func (r *testStorageSuite) TestCreateStorage(c *C) {
c.Assert(s3.Prefix, Equals, "prefix")
c.Assert(s3.Endpoint, Equals, "https://s3.example.com/")

fakeCredentialsFile, err := ioutil.TempFile("", "fakeCredentialsFile")
c.Assert(err, IsNil)
defer func() {
fakeCredentialsFile.Close()
os.Remove(fakeCredentialsFile.Name())
}()
gcsOpt := &BackendOptions{
GCS: GCSBackendOptions{
Endpoint: "https://gcs.example.com/",
CredentialsFile: fakeCredentialsFile.Name(),
Endpoint: "https://gcs.example.com/",
},
}
s, err = ParseBackend("gcs://bucket2/prefix/", gcsOpt)
Expand All @@ -71,13 +64,26 @@ func (r *testStorageSuite) TestCreateStorage(c *C) {
c.Assert(gcs.Bucket, Equals, "bucket2")
c.Assert(gcs.Prefix, Equals, "prefix/")
c.Assert(gcs.Endpoint, Equals, "https://gcs.example.com/")
c.Assert(gcs.CredentialsBlob, Equals, "")

fakeCredentialsFile, err := ioutil.TempFile("", "fakeCredentialsFile")
c.Assert(err, IsNil)
_, err = fakeCredentialsFile.Write([]byte("fakeCredentials"))
c.Assert(err, IsNil)
defer func() {
fakeCredentialsFile.Close()
os.Remove(fakeCredentialsFile.Name())
}()
gcsOpt.GCS.CredentialsFile = fakeCredentialsFile.Name()

s, err = ParseBackend("gcs://bucket/more/prefix/", gcsOpt)
c.Assert(err, IsNil)
gcs = s.GetGcs()
c.Assert(gcs, NotNil)
c.Assert(gcs.Bucket, Equals, "bucket")
c.Assert(gcs.Prefix, Equals, "more/prefix/")
c.Assert(gcs.Endpoint, Equals, "https://gcs.example.com/")
c.Assert(gcs.CredentialsBlob, Equals, "fakeCredentials")
}

func (r *testStorageSuite) TestFormatBackendURL(c *C) {
Expand Down
15 changes: 5 additions & 10 deletions pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import (
"github.com/pingcap/kvproto/pkg/backup"
)

var (
sendCredential bool
)

const (
s3EndpointOption = "s3.endpoint"
s3RegionOption = "s3.region"
Expand Down Expand Up @@ -115,11 +111,6 @@ func defineS3Flags(flags *pflag.FlagSet) {
}

func getBackendOptionsFromS3Flags(flags *pflag.FlagSet) (options S3BackendOptions, err error) {
sendCredential, err = flags.GetBool(flagSendCredentialOption)
if err != nil {
err = errors.Trace(err)
return
}
options.Endpoint, err = flags.GetString(s3EndpointOption)
if err != nil {
err = errors.Trace(err)
Expand Down Expand Up @@ -181,7 +172,11 @@ func newS3Storage(backend *backup.S3) (*S3Storage, error) {
return nil, err
}

if sendCredential && ses.Config.Credentials != nil {
if !sendCredential {
// Clear the credentials if exists so that they will not be sent to TiKV
backend.AccessKey = ""
backend.SecretAccessKey = ""
} else if ses.Config.Credentials != nil {
if qs.AccessKey == "" || qs.SecretAccessKey == "" {
v, cerr := ses.Config.Credentials.Get()
if cerr != nil {
Expand Down
Loading