BigQuery: storage write API connector#4220
Conversation
|
Top 2 commits can be squashed I think |
|
Would be nice to redo test comments into given/when/then logs |
|
Can we have a constructor for tests instead of |
| if maxInFlight, err = conf.FieldMaxInFlight(); err != nil { | ||
| return | ||
| } | ||
| if batchPolicy, err = conf.FieldBatchPolicy(bqwaFieldBatching); err != nil { | ||
| return | ||
| } |
There was a problem hiding this comment.
Try preserving order where it makes sense
|
|
||
| When batching is enabled the table name is resolved from the first message in | ||
| each batch; all messages in the same batch are written to that table. | ||
| `). |
There was a problem hiding this comment.
nit: it's the best for maintenance to use sentence per line
|
It's handy to support |
|
I'd suggest doing what we did with AWS, extract that to separate package. Retroactive regrouping should follow. When we have a separate package please unpack utilities to separate files. |
Jeffail
left a comment
There was a problem hiding this comment.
Did a foundation pass to sanity-check whether the new code sits on the existing primitives cleanly. Most of it does — flagging two spots where the implementation works around the foundation rather than leveraging it, plus a few smaller observations that are more judgment calls.
|
Commits
Review LGTM |
|
Commits
Review LGTM |
|
|
||
| func bigQueryWriteAPISpec() *service.ConfigSpec { | ||
| return service.NewConfigSpec(). | ||
| Version("4.87.0"). |
There was a problem hiding this comment.
Update this to 4.90.0 if it's going out in this release?
| Name: new("synthetic.proto"), | ||
| Syntax: new("proto2"), |
There was a problem hiding this comment.
These calls won't compile. Go's built-in new(T) requires T to be a type, not a value, so new("synthetic.proto") and new("proto2") are invalid expressions. The same pattern is repeated in output_test.go (e.g. new("BrokenRef"), new(int32(1)), etc.).
Use a small generic helper such as func ptr[T any](v T) *T { return &v } and call ptr("synthetic.proto"), or use proto.String(...) / proto.Int32(...) from google.golang.org/protobuf/proto. As written, the package fails to build.
| gcp_bigquery ,output ,GCP BigQuery ,3.55.0 ,certified ,n ,y ,y | ||
| gcp_bigquery_select ,input ,GCP BigQuery ,3.63.0 ,certified ,n ,y ,y | ||
| gcp_bigquery_select ,processor ,GCP BigQuery ,3.64.0 ,certified ,n ,y ,y | ||
| gcp_bigquery_write_api ,output ,GCP BigQuery ,4.87.0 ,enterprise ,n ,y ,y |
There was a problem hiding this comment.
Version mismatch: this row lists 4.87.0, but the component spec uses Version("4.90.0") and the generated docs say Introduced in version 4.90.0. Pick one and align both places so users see consistent version metadata.
|
Commits Review
|
|
Commits Review
|
| if o.stopSweep != nil { | ||
| close(o.stopSweep) | ||
| o.stopSweep = nil | ||
| } | ||
|
|
||
| // Wait for the sweep goroutine to finish before closing streams/clients | ||
| // so it does not access shared state after shutdown. | ||
| o.sweepWg.Wait() |
There was a problem hiding this comment.
Data race on o.stopSweep between Close() and sweepIdleStreams() that can deadlock shutdown.
Close() writes to o.stopSweep (close + nil) under connMu.Lock(), but sweepIdleStreams() reads o.stopSweep from inside its select without holding connMu. If sweepIdleStreams happens to take the <-ticker.C case at the same moment Close() runs, the next iteration may observe the nil store and end up blocked forever on <-nil_chan (a receive from a nil channel blocks forever). Close() then hangs on o.sweepWg.Wait().
The godev guidance recommends sync.Once for shutdown signals. Switching to sync.Once.Do(func() { close(o.stopSweep) }) and never nil'ing the field eliminates both the race and the double-close concern.
| Name: new("synthetic.proto"), | ||
| Syntax: new("proto2"), |
There was a problem hiding this comment.
The Go builtin new takes a type and returns *T — it cannot accept a value. new("synthetic.proto") and new("proto2") will not compile (string literals are not type expressions). There is no shadowing helper named new defined in this package.
The intent here is to obtain a *string for the Name and Syntax fields of descriptorpb.FileDescriptorProto. Use the standard helpers from the already-imported google.golang.org/protobuf/proto package, e.g. proto.String("synthetic.proto") and proto.String("proto2").
This is the same pattern as the godev guidance — new(X) is reserved for zero-value pointers to types, not for "make a pointer to this value".
| Name: new("BrokenRef"), | ||
| Field: []*descriptorpb.FieldDescriptorProto{ | ||
| { | ||
| Name: new("ptr"), | ||
| Number: new(int32(1)), | ||
| Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(), | ||
| TypeName: new(".nonexistent.Missing"), | ||
| Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(), | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| md, err := descriptorProtoToMessageDescriptor(dp) | ||
| require.Error(t, err) | ||
| assert.Nil(t, md) | ||
| assert.Contains(t, err.Error(), "creating file descriptor from normalized proto") | ||
| }) | ||
|
|
||
| t.Run("duplicate field numbers", func(t *testing.T) { | ||
| dp := &descriptorpb.DescriptorProto{ | ||
| Name: new("DuplicateNumbers"), | ||
| Field: []*descriptorpb.FieldDescriptorProto{ | ||
| { | ||
| Name: new("alpha"), | ||
| Number: new(int32(1)), | ||
| Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(), | ||
| Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(), | ||
| }, | ||
| { | ||
| Name: new("beta"), | ||
| Number: new(int32(1)), |
There was a problem hiding this comment.
Same new(...) compile-error pattern as in output.go: new("BrokenRef"), new("ptr"), new(int32(1)), new(".nonexistent.Missing"), new("DuplicateNumbers"), new("alpha"), new("beta") (lines 227, 230, 231, 233, 247, 250, 251, 256, 257) all pass a value to the builtin new, which requires a type. None of these will compile — int32(1) is a conversion expression, not a type, and string literals are not types.
Replace all of these with the appropriate helpers from google.golang.org/protobuf/proto: proto.String(...) for the *string fields (Name, TypeName) and proto.Int32(1) for the *int32 Number field.
These tests cannot run as written, so the error-path coverage they're meant to add is not actually exercised.
| func (o *bigQueryWriteAPIOutput) sweepIdleStreams(stop <-chan struct{}) { | ||
| defer o.sweepWg.Done() | ||
|
|
||
| ticker := time.NewTicker(o.conf.StreamSweepInterval) |
There was a problem hiding this comment.
time.NewTicker panics if d <= 0. The stream_sweep_interval field is exposed as a YAML-configurable service.NewDurationField with no minimum validation, so a user setting stream_sweep_interval: 0s (or a negative duration) makes Connect() crash the process when this goroutine starts.
Either reject non-positive values during config parsing in bigQueryWriteAPIConfigFromParsed (returning a clear error), or guard the ticker creation here. Same risk applies to stream_idle_timeout if zero is treated as "always idle" — worth either documenting or rejecting.
|
Commits Review
|
| httpEndpoint := fmt.Sprintf("http://localhost:%s", httpPort.Port()) | ||
| grpcEndpoint := fmt.Sprintf("localhost:%s", grpcPort.Port()) |
There was a problem hiding this comment.
The container host is hardcoded to localhost, but the tester pattern is to resolve it via ctr.Host(t.Context()) (e.g. as done in postgresql/integration_test.go#L71). Using the resolved host makes the test work in non-localhost test environments (remote Docker, Podman with mapped hosts, etc.). Replace both localhost substitutions with the value returned by ctr.Host.
|
Commits Review
|
New BatchOutput backed by the BigQuery Storage Write API for higher throughput and lower latency than the legacy streaming API. - JSON or protobuf input with auto schema fetch + proto conversion - Per-table managed stream cache with idle eviction sweeper - gRPC error classification: permanent failures returned as BatchError - Service-account impersonation via target_principal + delegates - Configurable stream idle timeout and sweep interval - Endpoint overrides for local emulators - Prometheus-style _total counter metrics + batch latency timer
|
Commits Review
|
No description provided.