-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Closed
Description
What happened?
BigQueryIO.Write<Row> batchWrite =
rowWrite
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(fileLoadSpecOptional.get().writeTriggerFrequency))
.withAutoSharding()
.withExtendedErrorInfo()
.withMaxRetryJobs(fileLoadSpecOptional.get().withJobRetryCount)
.withWriteTempDataset(TMP_TABLE_DATASET)
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(fileLoadSpecOptional.get().stagingBucket));
Have following code to setup file_load operation for Dataflow streaming jobs using GCS and File_notification based ingestion into BigQuery
- It creates a JobId for new file after few minutes and load data into Live table with jobId
beam_bq_job_LOAD_defaultnetworkvpcflowlogslive_e78561b1b6e147e7abafe83b9314c7f1_41dd2dbb7ce5b6e717e5aadb4244b4e0_00001_00000-0
but after this every new files are added to staging bucket new set of files have exactly1partition and c.pane.index seems to be0causing the same job_id and ignoring file writes
See the screenshot for different timestamp we have the same job_id ignoring inserts
Should we add random uuid to jobId logic to fix this issue here ?
by changing this line to:
String jobIdPrefix =
BigQueryResourceNaming.createJobIdWithDestination(
c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex()) + "-" + random5CharUUID();
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
maskalski-clgx and pranjal5215